Skip to content

Commit

Permalink
feat(dbt): collect table schema metadata using the dagster dbt package
Browse files Browse the repository at this point in the history
  • Loading branch information
rexledesma committed Feb 14, 2024
1 parent d378005 commit a13f1b5
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def default_metadata_from_dbt_resource_props(
metadata: Dict[str, Any] = {}
columns = dbt_resource_props.get("columns", {})
if len(columns) > 0:
metadata["table_schema"] = MetadataValue.table_schema(
metadata["columns"] = MetadataValue.table_schema(
TableSchema(
columns=[
TableColumn(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import contextlib
import copy
import os
import shutil
import signal
Expand Down Expand Up @@ -43,6 +45,7 @@
from typing_extensions import Literal

from ..asset_utils import (
default_metadata_from_dbt_resource_props,
get_manifest_and_translator_from_dbt_assets,
output_name_fn,
)
Expand Down Expand Up @@ -80,19 +83,11 @@ class DbtCliEventMessage:
raw_event (Dict[str, Any]): The raw event dictionary.
See https://docs.getdbt.com/reference/events-logging#structured-logging for more
information.
metadata (Dict[str, Any]): A dictionary of external metadata attached to the event.
"""

raw_event: Dict[str, Any]

@classmethod
def from_log(cls, log: str) -> "DbtCliEventMessage":
"""Parse an event according to https://docs.getdbt.com/reference/events-logging#structured-logging.
We assume that the log format is json.
"""
raw_event: Dict[str, Any] = orjson.loads(log)

return cls(raw_event=raw_event)
metadata: Dict[str, Any]

def __str__(self) -> str:
return self.raw_event["info"]["msg"]
Expand Down Expand Up @@ -147,10 +142,16 @@ def to_default_asset_events(
"No dbt manifest was provided. Dagster events for dbt tests will not be created."
)

unique_id: str = event_node_info["unique_id"]
invocation_id: str = self.raw_event["info"]["invocation_id"]
dbt_resource_props = manifest["nodes"][unique_id]
default_metadata = {
**default_metadata_from_dbt_resource_props(self.metadata),
"unique_id": unique_id,
"invocation_id": invocation_id,
}
has_asset_def: bool = bool(context and context.has_assets_def)

invocation_id: str = self.raw_event["info"]["invocation_id"]
unique_id: str = event_node_info["unique_id"]
node_resource_type: str = event_node_info["resource_type"]
node_status: str = event_node_info["node_status"]
node_materialization: str = self.raw_event["data"]["node_info"]["materialized"]
Expand All @@ -172,8 +173,7 @@ def to_default_asset_events(
value=None,
output_name=output_name_fn(event_node_info),
metadata={
"unique_id": unique_id,
"invocation_id": invocation_id,
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
},
Expand All @@ -185,8 +185,7 @@ def to_default_asset_events(
yield AssetMaterialization(
asset_key=asset_key,
metadata={
"unique_id": unique_id,
"invocation_id": invocation_id,
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
},
Expand All @@ -195,8 +194,7 @@ def to_default_asset_events(
upstream_unique_ids: List[str] = manifest["parent_map"][unique_id]
test_resource_props = manifest["nodes"][unique_id]
metadata = {
"unique_id": unique_id,
"invocation_id": invocation_id,
**default_metadata,
"status": node_status,
**adapter_response_metadata,
}
Expand Down Expand Up @@ -460,9 +458,17 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
Returns:
Iterator[DbtCliEventMessage]: An iterator of events from the dbt CLI process.
"""
metadata_by_unique_id: Dict[str, Dict[str, Any]] = {}

for log in self._stdout or self._stream_stdout():
try:
event = DbtCliEventMessage.from_log(log=log)
raw_event: Dict[str, Any] = orjson.loads(log)
unique_id: Optional[str] = raw_event["data"].get("node_info", {}).get("unique_id")
metadata = (
copy.deepcopy(metadata_by_unique_id.get(unique_id, {})) if unique_id else {}
)

event = DbtCliEventMessage(raw_event=raw_event, metadata=metadata)

is_error_message = event.log_level == "error"
is_debug_message = event.log_level == "debug"
Expand All @@ -472,6 +478,16 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
if is_error_message:
self._error_messages.append(str(event))

# Attempt to parse the columns metadata from the event message.
# If it exists, then save it as metadata for the dbt node in the manifest.
if event.raw_event["info"]["name"] == "JinjaLogInfo":
with contextlib.suppress(orjson.JSONDecodeError):
columns = orjson.loads(event.raw_event["info"]["msg"])
metadata_by_unique_id[cast(str, unique_id)] = {"columns": columns}

# Don't show this message in stdout
continue

# Only write debug logs to stdout if the user explicitly set
# the log level to debug.
if not is_debug_message or is_debug_user_log_level:
Expand Down
12 changes: 10 additions & 2 deletions python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
test_asset_key_exceptions_path,
test_dbt_python_interleaving_path,
test_meta_config_path,
test_metadata_path,
)

# ======= CONFIG ========
Expand Down Expand Up @@ -93,8 +94,10 @@ def dbt_build(dbt_executable, dbt_config_dir):


def _create_dbt_manifest(project_dir: Path) -> Dict[str, Any]:
dbt = DbtCliResource(project_dir=os.fspath(project_dir))
dbt_invocation = dbt.cli(["--quiet", "compile"]).wait()
dbt = DbtCliResource(project_dir=os.fspath(project_dir), global_config_flags=["--quiet"])

dbt.cli(["deps"]).wait()
dbt_invocation = dbt.cli(["compile"]).wait()

return dbt_invocation.get_artifact("manifest.json")

Expand All @@ -117,3 +120,8 @@ def test_dbt_python_interleaving_manifest_fixture() -> Dict[str, Any]:
@pytest.fixture(name="test_meta_config_manifest", scope="session")
def test_meta_config_manifest_fixture() -> Dict[str, Any]:
return _create_dbt_manifest(test_meta_config_path)


@pytest.fixture(name="test_metadata_manifest", scope="session")
def test_metadata_manifest_fixture() -> Dict[str, Any]:
return _create_dbt_manifest(test_metadata_path)
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,9 @@ def test_no_default_asset_events_emitted(data: dict) -> None:
"invocation_id": "1-2-3",
},
"data": data,
}
).to_default_asset_events(manifest={})
},
metadata={},
).to_default_asset_events(manifest={"nodes": {"a.b.c": {}}})

assert list(asset_events) == []

Expand Down Expand Up @@ -531,7 +532,9 @@ def test_to_default_asset_output_events() -> None:
}

asset_events = list(
DbtCliEventMessage(raw_event=raw_event).to_default_asset_events(manifest=manifest)
DbtCliEventMessage(raw_event=raw_event, metadata={}).to_default_asset_events(
manifest=manifest
)
)

assert len(asset_events) == 1
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os
from typing import Any, Dict, cast

from dagster import (
AssetExecutionContext,
Output,
TableColumn,
TableSchema,
materialize,
)
from dagster_dbt.asset_decorator import dbt_assets
from dagster_dbt.core.resources_v2 import DbtCliResource

from ..dbt_projects import test_meta_config_path, test_metadata_path


def test_no_columns_metadata(test_meta_config_manifest: Dict[str, Any]) -> None:
@dbt_assets(manifest=test_meta_config_manifest)
def assert_no_columns_metadata(context: AssetExecutionContext, dbt: DbtCliResource):
events = list(dbt.cli(["build"], context=context).stream())
output_by_dbt_unique_id: Dict[str, Output] = {
cast(str, dagster_event.metadata["unique_id"].value): dagster_event
for dagster_event in events
if isinstance(dagster_event, Output)
}

for output in output_by_dbt_unique_id.values():
assert "columns" not in output.metadata

yield from events

result = materialize(
[assert_no_columns_metadata],
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_meta_config_path))},
)

assert result.success


def test_columns_metadata(test_metadata_manifest: Dict[str, Any]) -> None:
@dbt_assets(manifest=test_metadata_manifest)
def assert_columns_metadata(context: AssetExecutionContext, dbt: DbtCliResource):
events = list(dbt.cli(["build"], context=context).stream())
output_by_dbt_unique_id: Dict[str, Output] = {
cast(str, dagster_event.metadata["unique_id"].value): dagster_event
for dagster_event in events
if isinstance(dagster_event, Output)
}

for output in output_by_dbt_unique_id.values():
assert "columns" in output.metadata

customers_output = output_by_dbt_unique_id["model.test_dagster_metadata.customers"]
assert (
TableSchema(
columns=[
TableColumn("customer_id", type="INTEGER"),
TableColumn("first_name", type="character varying(256)"),
TableColumn("last_name", type="character varying(256)"),
TableColumn("first_order", type="DATE"),
TableColumn("most_recent_order", type="DATE"),
TableColumn("number_of_orders", type="BIGINT"),
TableColumn("customer_lifetime_value", type="DOUBLE"),
]
)
== customers_output.metadata["columns"].value
)

yield from events

result = materialize(
[assert_columns_metadata],
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_metadata_path))},
)

assert result.success
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
test_dbt_python_interleaving_path = projects_path.joinpath("test_dagster_dbt_python_interleaving")
test_exceptions_path = projects_path.joinpath("test_dagster_exceptions")
test_meta_config_path = projects_path.joinpath("test_dagster_meta_config")
test_metadata_path = projects_path.joinpath("test_dagster_metadata")
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ venv/
env/
**/*.duckdb
**/*.duckdb.wal
package-lock.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ clean-targets:
require-dbt-version: [">=1.0.0", "<2.0.0"]

models:
+post-hook:
- "{{ dagster.log_columns_in_relation() }}"
test_dagster_metadata:
materialized: table
staging:
materialized: view

seeds:
+post-hook:
- "{{ dagster.log_columns_in_relation() }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
packages:
- local: "../../../dbt_packages/dagster"

0 comments on commit a13f1b5

Please sign in to comment.