diff --git a/docs/content/integrations/dbt/reference.mdx b/docs/content/integrations/dbt/reference.mdx
index 49347366913db..cd7f4bd41b7b2 100644
--- a/docs/content/integrations/dbt/reference.mdx
+++ b/docs/content/integrations/dbt/reference.mdx
@@ -571,18 +571,18 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
---
-## Emit column schema as materialization metadata
+## Emit column-level metadata as materialization metadata
- Emitting column schema as materialization metadata is currently an
+ Emitting column-level metadata as materialization metadata is currently an
experimental feature.{" "}
{" "}
- To use this feature, you'll need to be on at least `dagster==1.6.6` and
- `dagster-dbt==0.22.6`.
+ To use this feature, you'll need to be on at least `dagster>=1.6.12` and
+ `dagster-dbt>=0.22.12`.
-Dagster allows you to emit column schema [materialization metadata](/concepts/assets/software-defined-assets#recording-materialization-metadata), which includes the column names and data types of your materialized dbt models, seeds, and snapshots.
+Dagster allows you to emit column-level metadata, like column schema and column dependencies, as [materialization metadata](/concepts/assets/software-defined-assets#recording-materialization-metadata).
With this metadata, you can view documentation in Dagster for all columns, not just columns described in your dbt project.
@@ -595,20 +595,20 @@ packages:
revision: DAGSTER_VERSION # replace with the version of `dagster` you are using.
```
-Then, enable the `dagster.log_columns_in_relation()` macro as a [post-hook](https://docs.getdbt.com/reference/resource-configs/pre-hook-post-hook) for the dbt resources that should emit column schema metadata. For example, adding the following snippet in `dbt_project.yml` enables this macro for all dbt models, seeds, and snapshots:
+Then, enable the `dagster.log_column_level_metadata()` macro as a [post-hook](https://docs.getdbt.com/reference/resource-configs/pre-hook-post-hook) for the dbt resources that should emit column schema metadata. For example, adding the following snippet in `dbt_project.yml` enables this macro for all dbt models, seeds, and snapshots:
```yaml
models:
+post-hook:
- - "{{ dagster.log_columns_in_relation() }}"
+ - "{{ dagster.log_column_level_metadata() }}"
seeds:
+post-hook:
- - "{{ dagster.log_columns_in_relation() }}"
+ - "{{ dagster.log_column_level_metadata() }}"
snapshots:
+post-hook:
- - "{{ dagster.log_columns_in_relation() }}"
+ - "{{ dagster.log_column_level_metadata() }}"
```
---
diff --git a/pyright/alt-1/requirements-pinned.txt b/pyright/alt-1/requirements-pinned.txt
index c18c8c5e6d3d7..d16bcecde108a 100644
--- a/pyright/alt-1/requirements-pinned.txt
+++ b/pyright/alt-1/requirements-pinned.txt
@@ -17,6 +17,7 @@ asn1crypto==1.5.1
astroid==3.1.0
asttokens==2.4.1
async-lru==2.0.4
+async-timeout==4.0.3
attrs==23.2.0
babel==2.14.0
backoff==2.2.1
@@ -256,6 +257,8 @@ snowflake-sqlalchemy==1.5.1
sortedcontainers==2.4.0
soupsieve==2.5
sqlalchemy==1.4.52
+sqlglot==22.3.1
+sqlglotrs==0.1.2
sqlparse==0.4.4
stack-data==0.6.3
starlette==0.37.2
diff --git a/pyright/master/requirements-pinned.txt b/pyright/master/requirements-pinned.txt
index fce163a19664b..f0595d620a286 100644
--- a/pyright/master/requirements-pinned.txt
+++ b/pyright/master/requirements-pinned.txt
@@ -35,6 +35,7 @@ asn1crypto==1.5.1
-e examples/assets_pandas_pyspark
asttokens==2.4.1
async-lru==2.0.4
+async-timeout==4.0.3
attrs==23.2.0
autodocsumm==0.2.12
autoflake==2.3.1
@@ -196,6 +197,7 @@ duckdb==0.10.1
ecdsa==0.18.0
email-validator==1.3.1
entrypoints==0.4
+exceptiongroup==1.2.0
execnet==2.0.2
executing==2.0.1
expandvars==0.12.0
@@ -509,6 +511,8 @@ sphinxcontrib-serializinghtml==1.1.10
sqlalchemy==1.4.52
sqlalchemy-jsonfield==1.0.2
sqlalchemy-utils==0.41.1
+sqlglot==22.3.1
+sqlglotrs==0.1.2
sqlparse==0.4.4
sshpubkeys==3.3.1
sshtunnel==0.4.0
diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
index e590266c26478..977cfa91b2b5c 100644
--- a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
+++ b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
@@ -17,6 +17,8 @@
Mapping,
Optional,
Sequence,
+ Set,
+ Tuple,
Union,
cast,
)
@@ -27,10 +29,12 @@
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
+ AssetKey,
AssetMaterialization,
AssetObservation,
AssetsDefinition,
ConfigurableResource,
+ JsonMetadataValue,
OpExecutionContext,
Output,
get_dagster_logger,
@@ -43,6 +47,13 @@
from dbt.version import __version__ as dbt_version
from packaging import version
from pydantic import Field, validator
+from sqlglot import (
+ MappingSchema,
+ exp,
+ parse_one,
+)
+from sqlglot.lineage import lineage
+from sqlglot.optimizer import optimize
from typing_extensions import Literal
from ..asset_utils import (
@@ -104,12 +115,18 @@ def log_level(self) -> str:
"""The log level of the event."""
return self.raw_event["info"]["level"]
+ @property
+ def has_column_lineage_metadata(self) -> bool:
+ """Whether the event has column level lineage metadata."""
+ return bool(self._event_history_metadata) and "parents" in self._event_history_metadata
+
@public
def to_default_asset_events(
self,
manifest: DbtManifestParam,
dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(),
context: Optional[OpExecutionContext] = None,
+ target_path: Optional[Path] = None,
) -> Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]:
"""Convert a dbt CLI event to a set of corresponding Dagster events.
@@ -117,6 +134,9 @@ def to_default_asset_events(
manifest (Union[Mapping[str, Any], str, Path]): The dbt manifest blob.
dagster_dbt_translator (DagsterDbtTranslator): Optionally, a custom translator for
linking dbt nodes to Dagster assets.
+ context (Optional[OpExecutionContext]): The execution context.
+ target_path (Optional[Path]): An explicit path to a target folder used to retrieve
+ dbt artifacts while generating events.
Returns:
Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]:
@@ -175,6 +195,16 @@ def to_default_asset_events(
finished_at = dateutil.parser.isoparse(event_node_info["node_finished_at"])
duration_seconds = (finished_at - started_at).total_seconds()
+ lineage_metadata = (
+ self._build_column_lineage_metadata(
+ manifest=manifest,
+ dagster_dbt_translator=dagster_dbt_translator,
+ target_path=target_path,
+ )
+ if target_path
+ else {}
+ )
+
if has_asset_def:
yield Output(
value=None,
@@ -183,6 +213,7 @@ def to_default_asset_events(
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
+ **lineage_metadata,
},
)
else:
@@ -195,6 +226,7 @@ def to_default_asset_events(
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
+ **lineage_metadata,
},
)
elif manifest and node_resource_type == NodeType.Test and is_node_finished:
@@ -286,6 +318,107 @@ def _process_adapter_response_metadata(
return processed_adapter_response
+ def _build_column_lineage_metadata(
+ self,
+ manifest: Mapping[str, Any],
+ dagster_dbt_translator: DagsterDbtTranslator,
+ target_path: Path,
+ ) -> Dict[str, Any]:
+ """Process the lineage metadata for a dbt CLI event.
+
+ Args:
+ manifest (Mapping[str, Any]): The dbt manifest blob.
+ dagster_dbt_translator (DagsterDbtTranslator): The translator for dbt nodes to Dagster assets.
+ target_path (Path): The path to the dbt target folder.
+
+ Returns:
+ Dict[str, Any]: The lineage metadata.
+ """
+ if (
+ # The dbt project name is only available from the manifest in `dbt-core>=1.6`.
+ version.parse(dbt_version) < version.parse("1.6.0")
+ # Column lineage can only be built if initial metadata is provided.
+ or not self.has_column_lineage_metadata
+ ):
+ return {}
+
+ event_node_info: Dict[str, Any] = self.raw_event["data"].get("node_info")
+ unique_id: str = event_node_info["unique_id"]
+ dbt_resource_props: Dict[str, Any] = manifest["nodes"][unique_id]
+
+ # If the unique_id is a seed, then we don't need to process lineage.
+ if unique_id.startswith("seed"):
+ return {}
+
+ # 1. Retrieve the current node's SQL file and its parents' column schemas.
+ sqlglot_mapping_schema = MappingSchema()
+ for relation_name, relation_metadata in self._event_history_metadata["parents"].items():
+ sqlglot_mapping_schema.add_table(
+ table=relation_name,
+ column_mapping={
+ column_name: column_metadata["data_type"]
+ for column_name, column_metadata in relation_metadata["columns"].items()
+ },
+ )
+
+ node_sql_path = target_path.joinpath(
+ "run", manifest["metadata"]["project_name"], dbt_resource_props["original_file_path"]
+ )
+ node_ast = parse_one(sql=node_sql_path.read_text()).expression
+ optimized_node_ast = cast(
+ exp.Query,
+ optimize(
+ node_ast,
+ schema=sqlglot_mapping_schema,
+ validate_qualify_columns=False, # Don't throw an error if we can't qualify a column without ambiguity.
+ ),
+ )
+
+ # 2. Retrieve the column names from the current node.
+ column_names = cast(exp.Query, optimized_node_ast).named_selects
+
+ # 3. For each column, retrieve its dependencies on upstream columns from direct parents.
+ # TODO: this should be refactored as a Python object that renders the JSON metadata.
+ lineage_metadata: Dict[str, List[Tuple[AssetKey, str]]] = {}
+ for column_name in column_names:
+ dbt_parent_resource_props_by_alias: Dict[str, Dict[str, Any]] = {
+ parent_dbt_resource_props["alias"]: parent_dbt_resource_props
+ for parent_dbt_resource_props in map(
+ lambda parent_unique_id: manifest["nodes"][parent_unique_id],
+ dbt_resource_props["depends_on"]["nodes"],
+ )
+ }
+
+ parent_columns: Set[Tuple[AssetKey, str]] = set()
+ for sqlglot_lineage_node in lineage(
+ column=column_name, sql=optimized_node_ast, schema=sqlglot_mapping_schema
+ ).walk():
+ column = sqlglot_lineage_node.expression.find(exp.Column)
+ if column and column.table in dbt_parent_resource_props_by_alias:
+ parent_resource_props = dbt_parent_resource_props_by_alias[column.table]
+ parent_asset_key = dagster_dbt_translator.get_asset_key(parent_resource_props)
+
+ parent_columns.add((parent_asset_key, column.name))
+
+ lineage_metadata[column_name] = list(parent_columns)
+
+ # 4. Render the lineage as a JSON blob.
+ # TODO: this should just call a method on a Python object that renders the JSON metadata.
+ return {
+ "dagster/lineage": JsonMetadataValue(
+ {
+ column_name: [
+ {
+ "upstream_asset_key": parent_asset_key,
+ "upstream_column_name": parent_column_name,
+ }
+ for parent_asset_key, parent_column_name in parent_columns
+ ]
+ for column_name, parent_columns in lineage_metadata.items()
+ }
+ )
+ }
+
@dataclass
class DbtCliInvocation:
@@ -456,6 +589,7 @@ def my_dbt_assets(context, dbt: DbtCliResource):
manifest=self.manifest,
dagster_dbt_translator=self.dagster_dbt_translator,
context=self.context,
+ target_path=self.target_path,
)
@public
@@ -489,14 +623,15 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
if is_error_message:
self._error_messages.append(str(event))
- # Attempt to parse the columns metadata from the event message.
+ # Attempt to parse the column level metadata from the event message.
# If it exists, save it as historical metadata to attach to the NodeFinished event.
if event.raw_event["info"]["name"] == "JinjaLogInfo":
with contextlib.suppress(orjson.JSONDecodeError):
- columns = orjson.loads(event.raw_event["info"]["msg"])
- event_history_metadata_by_unique_id[cast(str, unique_id)] = {
- "columns": columns
- }
+ column_level_metadata = orjson.loads(event.raw_event["info"]["msg"])
+
+ event_history_metadata_by_unique_id[cast(str, unique_id)] = (
+ column_level_metadata
+ )
# Don't show this message in stdout
continue
diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py
index 35383c34736a6..d567da9637ce0 100644
--- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py
+++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py
@@ -54,12 +54,15 @@ def disable_openblas_threading_affinity_fixture() -> None:
os.environ["GOTOBLAS_MAIN_FREE"] = "1"
-def _create_dbt_invocation(project_dir: Path) -> DbtCliInvocation:
+def _create_dbt_invocation(project_dir: Path, build_project: bool = False) -> DbtCliInvocation:
dbt = DbtCliResource(project_dir=os.fspath(project_dir), global_config_flags=["--quiet"])
dbt.cli(["deps"]).wait()
dbt_invocation = dbt.cli(["compile"]).wait()
+ if build_project:
+ dbt.cli(["build"], raise_on_error=False).wait()
+
return dbt_invocation
@@ -83,15 +86,10 @@ def test_jaffle_shop_manifest_fixture(
@pytest.fixture(name="test_asset_checks_manifest", scope="session")
def test_asset_checks_manifest_fixture() -> Dict[str, Any]:
# Prepopulate duckdb with jaffle shop data to support testing individual asset checks.
- (
- DbtCliResource(
- project_dir=os.fspath(test_asset_checks_path), global_config_flags=["--quiet"]
- )
- .cli(["build"], raise_on_error=False)
- .wait()
- )
-
- return _create_dbt_invocation(test_asset_checks_path).get_artifact("manifest.json")
+ return _create_dbt_invocation(
+ test_asset_checks_path,
+ build_project=True,
+ ).get_artifact("manifest.json")
@pytest.fixture(name="test_asset_key_exceptions_manifest", scope="session")
@@ -138,4 +136,8 @@ def test_meta_config_manifest_fixture() -> Dict[str, Any]:
@pytest.fixture(name="test_metadata_manifest", scope="session")
def test_metadata_manifest_fixture() -> Dict[str, Any]:
- return _create_dbt_invocation(test_metadata_path).get_artifact("manifest.json")
+ # Prepopulate duckdb with jaffle shop data to support testing individual column metadata.
+ return _create_dbt_invocation(
+ test_metadata_path,
+ build_project=True,
+ ).get_artifact("manifest.json")
diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py
index d6a9e4cf8322a..8ad869f4fc296 100644
--- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py
+++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py
@@ -1,11 +1,14 @@
import json
import os
import subprocess
-from typing import Any, Dict, cast
+from typing import Any, Dict, Optional
+import pytest
from dagster import (
AssetExecutionContext,
- Output,
+ AssetKey,
+ AssetSelection,
+ JsonMetadataValue,
TableColumn,
TableSchema,
materialize,
@@ -13,98 +16,360 @@
from dagster._core.definitions.metadata import TableMetadataEntries
from dagster_dbt.asset_decorator import dbt_assets
from dagster_dbt.core.resources_v2 import DbtCliResource
+from dbt.version import __version__ as dbt_version
+from packaging import version
from ...dbt_projects import test_jaffle_shop_path, test_metadata_path
-def test_no_columns_metadata(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
+def test_no_column_schema(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
@dbt_assets(manifest=test_jaffle_shop_manifest)
- def assert_no_columns_metadata(context: AssetExecutionContext, dbt: DbtCliResource):
- events = list(dbt.cli(["build"], context=context).stream())
- output_by_dbt_unique_id: Dict[str, Output] = {
- cast(str, dagster_event.metadata["unique_id"].value): dagster_event
- for dagster_event in events
- if isinstance(dagster_event, Output)
- }
-
- for output in output_by_dbt_unique_id.values():
- assert "columns" not in output.metadata
-
- yield from events
+ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
+ yield from dbt.cli(["build"], context=context).stream()
result = materialize(
- [assert_no_columns_metadata],
+ [my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_jaffle_shop_path))},
)
assert result.success
+ assert all(
+ not TableMetadataEntries.extract(event.materialization.metadata).column_schema
+ for event in result.get_asset_materialization_events()
+ )
-def test_columns_metadata(test_metadata_manifest: Dict[str, Any]) -> None:
+def test_column_schema(test_metadata_manifest: Dict[str, Any]) -> None:
@dbt_assets(manifest=test_metadata_manifest)
- def assert_columns_metadata(context: AssetExecutionContext, dbt: DbtCliResource):
- events = list(dbt.cli(["build"], context=context).stream())
- output_by_dbt_unique_id: Dict[str, Output] = {
- cast(str, dagster_event.metadata["unique_id"].value): dagster_event
- for dagster_event in events
- if isinstance(dagster_event, Output)
- }
-
- for output in output_by_dbt_unique_id.values():
- assert TableMetadataEntries.extract(output.metadata).column_schema is not None
-
- customers_output = output_by_dbt_unique_id["model.test_dagster_metadata.customers"]
- assert (
- TableSchema(
- columns=[
- TableColumn("customer_id", type="INTEGER"),
- TableColumn("first_name", type="character varying(256)"),
- TableColumn("last_name", type="character varying(256)"),
- TableColumn("first_order", type="DATE"),
- TableColumn("most_recent_order", type="DATE"),
- TableColumn("number_of_orders", type="BIGINT"),
- TableColumn("customer_lifetime_value", type="DOUBLE"),
- ]
- )
- == TableMetadataEntries.extract(customers_output.metadata).column_schema
- )
-
- yield from events
+ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
+ yield from dbt.cli(["build"], context=context).stream()
result = materialize(
- [assert_columns_metadata],
+ [my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_metadata_path))},
)
assert result.success
+ table_schema_by_asset_key = {
+ event.materialization.asset_key: TableMetadataEntries.extract(
+ event.materialization.metadata
+ ).column_schema
+ for event in result.get_asset_materialization_events()
+ if event.materialization.asset_key == AssetKey(["customers"])
+ }
+ expected_table_schema_by_asset_key = {
+ AssetKey(["customers"]): TableSchema(
+ columns=[
+ TableColumn("customer_id", type="INTEGER"),
+ TableColumn("first_name", type="character varying(256)"),
+ TableColumn("last_name", type="character varying(256)"),
+ TableColumn("first_order", type="DATE"),
+ TableColumn("most_recent_order", type="DATE"),
+ TableColumn("number_of_orders", type="BIGINT"),
+ TableColumn("customer_lifetime_value", type="DOUBLE"),
+ ]
+ ),
+ }
-def test_dbt_cli_no_jinja_log_info() -> None:
- dbt = DbtCliResource(project_dir=os.fspath(test_metadata_path))
- dbt_cli_parse_invocation = dbt.cli(["parse"])
+ assert table_schema_by_asset_key == expected_table_schema_by_asset_key
- assert dbt_cli_parse_invocation.is_successful()
- assert not any(
- event.raw_event["info"]["name"] == "JinjaLogInfo"
- for event in dbt_cli_parse_invocation.stream_raw_events()
- )
+@pytest.mark.skipif(
+ version.parse(dbt_version) < version.parse("1.6.0"),
+ reason="Retrieving the dbt project name from the manifest is only available in `dbt-core>=1.6`",
+)
+@pytest.mark.parametrize(
+ "asset_key_selection",
+ [
+ None,
+ AssetKey(["raw_customers"]),
+ AssetKey(["stg_customers"]),
+ AssetKey(["customers"]),
+ AssetKey(["select_star_customers"]),
+ ],
+ ids=[
+ "--select fqn:*",
+ "--select raw_customers",
+ "--select stg_customers",
+ "--select customers",
+ "--select select_star_customers",
+ ],
+)
+def test_lineage(
+ test_metadata_manifest: Dict[str, Any], asset_key_selection: Optional[AssetKey]
+) -> None:
+ @dbt_assets(manifest=test_metadata_manifest)
+ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
+ yield from dbt.cli(["build"], context=context).stream()
-def test_dbt_raw_cli_no_empty_jinja_log_info() -> None:
- result = subprocess.check_output(
- ["dbt", "--log-format", "json", "--no-partial-parse", "parse"],
- text=True,
- cwd=test_metadata_path,
+ result = materialize(
+ [my_dbt_assets],
+ resources={"dbt": DbtCliResource(project_dir=os.fspath(test_metadata_path))},
+ selection=asset_key_selection and AssetSelection.keys(asset_key_selection),
)
+ assert result.success
- assert not any(
- json.loads(line)["info"]["name"] == "JinjaLogInfo" for line in result.splitlines()
- )
+ lineage_metadata_by_asset_key = {
+ event.materialization.asset_key: event.materialization.metadata.get("dagster/lineage")
+ for event in result.get_asset_materialization_events()
+ }
+ expected_lineage_metadata_by_asset_key = {
+ AssetKey(["raw_payments"]): None,
+ AssetKey(["raw_customers"]): None,
+ AssetKey(["raw_orders"]): None,
+ AssetKey(["stg_customers"]): JsonMetadataValue(
+ data={
+ "customer_id": [
+ {
+ "upstream_asset_key": AssetKey(["raw_customers"]),
+ "upstream_column_name": "id",
+ }
+ ],
+ "first_name": [
+ {
+ "upstream_asset_key": AssetKey(["raw_customers"]),
+ "upstream_column_name": "first_name",
+ }
+ ],
+ "last_name": [
+ {
+ "upstream_asset_key": AssetKey(["raw_customers"]),
+ "upstream_column_name": "last_name",
+ }
+ ],
+ }
+ ),
+ AssetKey(["stg_orders"]): JsonMetadataValue(
+ data={
+ "order_id": [
+ {
+ "upstream_asset_key": AssetKey(["raw_orders"]),
+ "upstream_column_name": "id",
+ }
+ ],
+ "customer_id": [
+ {
+ "upstream_asset_key": AssetKey(["raw_orders"]),
+ "upstream_column_name": "user_id",
+ }
+ ],
+ "order_date": [
+ {
+ "upstream_asset_key": AssetKey(["raw_orders"]),
+ "upstream_column_name": "order_date",
+ }
+ ],
+ "status": [
+ {
+ "upstream_asset_key": AssetKey(["raw_orders"]),
+ "upstream_column_name": "status",
+ }
+ ],
+ }
+ ),
+ AssetKey(["stg_payments"]): JsonMetadataValue(
+ data={
+ "payment_id": [
+ {
+ "upstream_asset_key": AssetKey(["raw_payments"]),
+ "upstream_column_name": "id",
+ }
+ ],
+ "order_id": [
+ {
+ "upstream_asset_key": AssetKey(["raw_payments"]),
+ "upstream_column_name": "order_id",
+ }
+ ],
+ "payment_method": [
+ {
+ "upstream_asset_key": AssetKey(["raw_payments"]),
+ "upstream_column_name": "payment_method",
+ }
+ ],
+ "amount": [
+ {
+ "upstream_asset_key": AssetKey(["raw_payments"]),
+ "upstream_column_name": "amount",
+ }
+ ],
+ }
+ ),
+ AssetKey(["orders"]): JsonMetadataValue(
+ data={
+ "order_id": [
+ {
+ "upstream_asset_key": AssetKey(["stg_orders"]),
+ "upstream_column_name": "order_id",
+ }
+ ],
+ "customer_id": [
+ {
+ "upstream_asset_key": AssetKey(["stg_orders"]),
+ "upstream_column_name": "customer_id",
+ }
+ ],
+ "order_date": [
+ {
+ "upstream_asset_key": AssetKey(["stg_orders"]),
+ "upstream_column_name": "order_date",
+ }
+ ],
+ "status": [
+ {
+ "upstream_asset_key": AssetKey(["stg_orders"]),
+ "upstream_column_name": "status",
+ }
+ ],
+ "credit_card_amount": [
+ {
+ "upstream_asset_key": AssetKey(["stg_payments"]),
+ "upstream_column_name": "amount",
+ }
+ ],
+ "coupon_amount": [
+ {
+ "upstream_asset_key": AssetKey(["stg_payments"]),
+ "upstream_column_name": "amount",
+ }
+ ],
+ "bank_transfer_amount": [
+ {
+ "upstream_asset_key": AssetKey(["stg_payments"]),
+ "upstream_column_name": "amount",
+ }
+ ],
+ "gift_card_amount": [
+ {
+ "upstream_asset_key": AssetKey(["stg_payments"]),
+ "upstream_column_name": "amount",
+ }
+ ],
+ "amount": [
+ {
+ "upstream_asset_key": AssetKey(["stg_payments"]),
+ "upstream_column_name": "amount",
+ }
+ ],
+ }
+ ),
+ AssetKey(["customers"]): JsonMetadataValue(
+ data={
+ "customer_id": [
+ {
+ "upstream_asset_key": AssetKey(["stg_customers"]),
+ "upstream_column_name": "customer_id",
+ }
+ ],
+ "first_name": [
+ {
+ "upstream_asset_key": AssetKey(["stg_customers"]),
+ "upstream_column_name": "first_name",
+ }
+ ],
+ "last_name": [
+ {
+ "upstream_asset_key": AssetKey(["stg_customers"]),
+ "upstream_column_name": "last_name",
+ }
+ ],
+ "first_order": [
+ {
+ "upstream_asset_key": AssetKey(["stg_orders"]),
+ "upstream_column_name": "order_date",
+ }
+ ],
+ "most_recent_order": [
+ {
+ "upstream_asset_key": AssetKey(["stg_orders"]),
+ "upstream_column_name": "order_date",
+ }
+ ],
+ "number_of_orders": [
+ {
+ "upstream_asset_key": AssetKey(["stg_orders"]),
+ "upstream_column_name": "order_id",
+ }
+ ],
+ "customer_lifetime_value": [
+ {
+ "upstream_asset_key": AssetKey(["stg_payments"]),
+ "upstream_column_name": "amount",
+ }
+ ],
+ }
+ ),
+ AssetKey(["select_star_customers"]): JsonMetadataValue(
+ data={
+ "customer_id": [
+ {
+ "upstream_asset_key": AssetKey(["customers"]),
+ "upstream_column_name": "customer_id",
+ }
+ ],
+ "first_name": [
+ {
+ "upstream_asset_key": AssetKey(["customers"]),
+ "upstream_column_name": "first_name",
+ }
+ ],
+ "last_name": [
+ {
+ "upstream_asset_key": AssetKey(["customers"]),
+ "upstream_column_name": "last_name",
+ }
+ ],
+ "first_order": [
+ {
+ "upstream_asset_key": AssetKey(["customers"]),
+ "upstream_column_name": "first_order",
+ }
+ ],
+ "most_recent_order": [
+ {
+ "upstream_asset_key": AssetKey(["customers"]),
+ "upstream_column_name": "most_recent_order",
+ }
+ ],
+ "number_of_orders": [
+ {
+ "upstream_asset_key": AssetKey(["customers"]),
+ "upstream_column_name": "number_of_orders",
+ }
+ ],
+ "customer_lifetime_value": [
+ {
+ "upstream_asset_key": AssetKey(["customers"]),
+ "upstream_column_name": "customer_lifetime_value",
+ }
+ ],
+ }
+ ),
+ }
+ if asset_key_selection:
+ expected_lineage_metadata_by_asset_key = {
+ asset_key: lineage_metadata
+ for asset_key, lineage_metadata in expected_lineage_metadata_by_asset_key.items()
+ if asset_key == asset_key_selection
+ }
+ assert lineage_metadata_by_asset_key == expected_lineage_metadata_by_asset_key
-def test_dbt_raw_cli_no_jinja_log_info() -> None:
+
+@pytest.mark.parametrize(
+ "command",
+ ["parse", "build"],
+ ids=[
+ "no empty jinja log info on parse",
+ "no jinja log info on execution",
+ ],
+)
+def test_dbt_raw_cli_no_jinja_log_info(
+ test_metadata_manifest: Dict[str, Any], command: str
+) -> None:
result = subprocess.check_output(
- ["dbt", "--log-format", "json", "--no-partial-parse", "build"],
+ ["dbt", "--log-format", "json", "--no-partial-parse", command],
text=True,
cwd=test_metadata_path,
)
diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml
index b6fe930b2d3cb..53288d76b1284 100644
--- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml
+++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml
@@ -21,7 +21,7 @@ require-dbt-version: [">=1.0.0", "<2.0.0"]
models:
+post-hook:
- - "{{ dagster.log_columns_in_relation() }}"
+ - "{{ dagster.log_column_level_metadata() }}"
test_dagster_metadata:
materialized: table
staging:
@@ -29,4 +29,4 @@ models:
seeds:
+post-hook:
- - "{{ dagster.log_columns_in_relation() }}"
+ - "{{ dagster.log_column_level_metadata() }}"
diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/models/select_star_customers.sql b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/models/select_star_customers.sql
new file mode 100644
index 0000000000000..52c20f845ef0d
--- /dev/null
+++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/models/select_star_customers.sql
@@ -0,0 +1 @@
+select * from {{ ref('customers') }}
diff --git a/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_column_level_metadata.sql b/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_column_level_metadata.sql
new file mode 100644
index 0000000000000..5fcdf70e7b6cb
--- /dev/null
+++ b/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_column_level_metadata.sql
@@ -0,0 +1,63 @@
+{% macro log_column_level_metadata() %}
+ -- This macro should only be run in the context of a `dagster-dbt` invocation.
+ {%- set is_dagster_dbt_cli = env_var('DAGSTER_DBT_CLI', '') == 'true' -%}
+
+ {%- if execute and is_dagster_dbt_cli -%}
+ -- Retrieve the column metadata of the current node.
+ {%- set columns = adapter.get_columns_in_relation(this) -%}
+ {%- set column_schema = {} -%}
+
+ {% for column in columns %}
+ {%- set serializable_column = {column.name: {'data_type': column.data_type}} -%}
+ {%- set _ = column_schema.update(serializable_column) -%}
+ {%- endfor -%}
+
+ -- For column level lineage, retrieve the column metadata of the current node's parents.
+ -- The parents are defined by the current node's dbt refs and sources.
+ {%- set parent_relations = [] -%}
+
+ {%- for ref_args in model.refs -%}
+ {%- set ref_relation = ref(ref_args['name'], package=ref_args.get('package'), version=ref_args.get('version'))-%}
+ {%- set _ = parent_relations.append(ref_relation) -%}
+ {%- endfor -%}
+
+ {%- for source_args in model.sources -%}
+ {%- set source_relation = source(source_args[0], sources_args[1])-%}
+ {%- set _ = parent_relations.append(ref_relation) -%}
+ {%- endfor -%}
+
+ -- Return a structured log of
+ -- {
+ -- "relation_name": str,
+ -- "columns": {
+ -- : {
+ -- "data_type": str
+ -- }
+ -- },
+ -- "parents": {
+ -- : {
+ -- "columns": {
+ -- : {
+ -- "data_type": str
+ -- }
+ -- }
+ -- }
+ -- }
+ -- }
+ {%- set structured_log = {'relation_name': this.render(), 'columns': column_schema, 'parents': {}} -%}
+
+ {%- for parent_relation in parent_relations -%}
+ {%- set parent_relation_columns = adapter.get_columns_in_relation(parent_relation) -%}
+ {%- set parent_relation_column_schema = {} -%}
+ {%- for column in parent_relation_columns -%}
+ {%- set serializable_column = {column.name: {'data_type': column.data_type}} -%}
+ {%- set _ = parent_relation_column_schema.update(serializable_column) -%}
+ {%- endfor -%}
+
+ {%- set structured_parent_relation_metadata = {parent_relation.render(): {'columns': parent_relation_column_schema}} -%}
+ {%- set _ = structured_log['parents'].update(structured_parent_relation_metadata) -%}
+ {%- endfor -%}
+
+ {%- do log(tojson(structured_log), info=true) -%}
+ {%- endif -%}
+{% endmacro %}
diff --git a/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_columns_in_relation.sql b/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_columns_in_relation.sql
deleted file mode 100644
index 821a87a81bada..0000000000000
--- a/python_modules/libraries/dagster-dbt/dbt_packages/dagster/macros/log_columns_in_relation.sql
+++ /dev/null
@@ -1,14 +0,0 @@
-{% macro log_columns_in_relation() %}
- {%- set is_dagster_dbt_cli = env_var('DAGSTER_DBT_CLI', '') == 'true' -%}
- {%- set columns = adapter.get_columns_in_relation(this) -%}
- {%- set table_schema = {} -%}
-
- {% for column in columns %}
- {%- set serializable_column = {column.name: {'data_type': column.data_type}} -%}
- {%- set _ = table_schema.update(serializable_column) -%}
- {% endfor %}
-
- {% if is_dagster_dbt_cli and table_schema %}
- {% do log(tojson(table_schema), info=true) %}
- {% endif %}
-{% endmacro %}
diff --git a/python_modules/libraries/dagster-dbt/setup.py b/python_modules/libraries/dagster-dbt/setup.py
index 709124a517872..9f60431eb975a 100644
--- a/python_modules/libraries/dagster-dbt/setup.py
+++ b/python_modules/libraries/dagster-dbt/setup.py
@@ -43,6 +43,7 @@ def get_version() -> str:
"orjson",
"requests",
"rich",
+ "sqlglot[rs]",
"typer>=0.9.0",
"packaging",
],