Skip to content

Commit

Permalink
feat(dbt): collect table schema metadata using the dagster dbt package
Browse files Browse the repository at this point in the history
  • Loading branch information
rexledesma committed Feb 12, 2024
1 parent 805ddf2 commit be0ea82
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import os
import shutil
import signal
Expand Down Expand Up @@ -43,6 +44,7 @@
from typing_extensions import Literal

from ..asset_utils import (
default_metadata_from_dbt_resource_props,
get_manifest_and_translator_from_dbt_assets,
output_name_fn,
)
Expand Down Expand Up @@ -147,10 +149,16 @@ def to_default_asset_events(
"No dbt manifest was provided. Dagster events for dbt tests will not be created."
)

unique_id: str = event_node_info["unique_id"]
invocation_id: str = self.raw_event["info"]["invocation_id"]
dbt_resource_props = manifest["nodes"][unique_id]
default_metadata = {
**default_metadata_from_dbt_resource_props(dbt_resource_props.get("dagster", {})),
"unique_id": unique_id,
"invocation_id": invocation_id,
}
has_asset_def: bool = bool(context and context.has_assets_def)

invocation_id: str = self.raw_event["info"]["invocation_id"]
unique_id: str = event_node_info["unique_id"]
node_resource_type: str = event_node_info["resource_type"]
node_status: str = event_node_info["node_status"]
node_materialization: str = self.raw_event["data"]["node_info"]["materialized"]
Expand All @@ -172,8 +180,7 @@ def to_default_asset_events(
value=None,
output_name=output_name_fn(event_node_info),
metadata={
"unique_id": unique_id,
"invocation_id": invocation_id,
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
},
Expand All @@ -185,8 +192,7 @@ def to_default_asset_events(
yield AssetMaterialization(
asset_key=asset_key,
metadata={
"unique_id": unique_id,
"invocation_id": invocation_id,
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
},
Expand All @@ -195,8 +201,7 @@ def to_default_asset_events(
upstream_unique_ids: List[str] = manifest["parent_map"][unique_id]
test_resource_props = manifest["nodes"][unique_id]
metadata = {
"unique_id": unique_id,
"invocation_id": invocation_id,
**default_metadata,
"status": node_status,
**adapter_response_metadata,
}
Expand Down Expand Up @@ -472,6 +477,20 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
if is_error_message:
self._error_messages.append(str(event))

# Attempt to parse the table schema from the event message.
# If it exists, then save it as metadata for the dbt node in the
# manifest.
if event.raw_event["info"]["name"] == "JinjaLogInfo":
unique_id: str = event.raw_event["data"]["node_info"]["unique_id"]
dbt_resource_props = self.manifest["nodes"][unique_id]

with contextlib.suppress(orjson.JSONDecodeError):
columns = orjson.loads(event.raw_event["info"]["msg"])
dbt_resource_props["dagster"] = {"columns": columns}

# Don't show this message in stdout
continue

# Only write debug logs to stdout if the user explicitly set
# the log level to debug.
if not is_debug_message or is_debug_user_log_level:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ def test_no_default_asset_events_emitted(data: dict) -> None:
},
"data": data,
}
).to_default_asset_events(manifest={})
).to_default_asset_events(manifest={"nodes": {"a.b.c": {}}})

assert list(asset_events) == []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ venv/
env/
**/*.duckdb
**/*.duckdb.wal
package-lock.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ clean-targets:
require-dbt-version: [">=1.0.0", "<2.0.0"]

models:
+post-hook:
- "{{ dagster.dagster__log_columns_in_relation() }}"
test_dagster_metadata:
materialized: table
staging:
materialized: view

seeds:
+post-hook:
- "{{ dagster.dagster__log_columns_in_relation() }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
packages:
- local: "../../../packages/dagster"

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import json
import os
from pathlib import Path
from typing import Iterator

import pytest
from dagster import AssetKey, AssetMaterialization, TableColumn, TableSchema
from dagster_dbt.core.resources_v2 import DbtCliResource
from pytest_mock import MockerFixture

test_no_dagster_metadata_dbt_project_dir = (
Path(__file__).joinpath("..", "..", "dbt_projects", "test_dagster_meta_config").resolve()
)
test_no_dagster_metadata_manifest = json.loads(
test_no_dagster_metadata_dbt_project_dir.joinpath("manifest.json").read_bytes()
)

test_dagster_metadata_dbt_project_dir = (
Path(__file__).joinpath("..", "..", "dbt_projects", "test_dagster_metadata").resolve()
)
test_dagster_metadata_manifest = json.loads(
test_dagster_metadata_dbt_project_dir.joinpath("manifest.json").read_bytes()
)


@pytest.fixture(name="dbt")
def dbt_fixture() -> Iterator[DbtCliResource]:
dbt = DbtCliResource(project_dir=os.fspath(test_dagster_metadata_dbt_project_dir))

dbt.cli(["deps"]).wait()

yield dbt


def test_no_table_schema_metadata(mocker: MockerFixture) -> None:
mock_context = mocker.MagicMock()
mock_context.assets_def = None
mock_context.has_assets_def = False

dbt = DbtCliResource(project_dir=os.fspath(test_no_dagster_metadata_dbt_project_dir))

events = list(
dbt.cli(
["build"],
manifest=test_no_dagster_metadata_manifest,
context=mock_context,
).stream()
)
materializations_by_asset_key = {
dagster_event.asset_key: dagster_event
for dagster_event in events
if isinstance(dagster_event, AssetMaterialization)
}
customers_materialization = materializations_by_asset_key[AssetKey(["customers"])]

assert "table_schema" not in customers_materialization.metadata


def test_table_schema_metadata(mocker: MockerFixture, dbt: DbtCliResource) -> None:
mock_context = mocker.MagicMock()
mock_context.assets_def = None
mock_context.has_assets_def = False

events = list(
dbt.cli(
["build"],
manifest=test_dagster_metadata_manifest,
context=mock_context,
).stream()
)
materializations_by_asset_key = {
dagster_event.asset_key: dagster_event
for dagster_event in events
if isinstance(dagster_event, AssetMaterialization)
}
customers_materialization = materializations_by_asset_key[AssetKey(["customers"])]

assert (
TableSchema(
columns=[
TableColumn("customer_id", type="INTEGER"),
TableColumn("first_name", type="character varying(256)"),
TableColumn("last_name", type="character varying(256)"),
TableColumn("first_order", type="DATE"),
TableColumn("most_recent_order", type="DATE"),
TableColumn("number_of_orders", type="BIGINT"),
TableColumn("customer_lifetime_value", type="DOUBLE"),
]
)
== customers_materialization.metadata["table_schema"].value
)

0 comments on commit be0ea82

Please sign in to comment.