Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dbt): collect columns metadata using the dagster dbt package #19631

Merged
merged 2 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def default_metadata_from_dbt_resource_props(
metadata: Dict[str, Any] = {}
columns = dbt_resource_props.get("columns", {})
if len(columns) > 0:
metadata["table_schema"] = MetadataValue.table_schema(
metadata["columns"] = MetadataValue.table_schema(
TableSchema(
columns=[
TableColumn(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import contextlib
import copy
import os
import shutil
import signal
import subprocess
import sys
import uuid
from contextlib import suppress
from dataclasses import dataclass, field
from dataclasses import InitVar, dataclass, field
from pathlib import Path
from typing import (
Any,
Expand Down Expand Up @@ -44,6 +46,7 @@

from ..asset_utils import (
dagster_name_fn,
default_metadata_from_dbt_resource_props,
get_manifest_and_translator_from_dbt_assets,
)
from ..dagster_dbt_translator import (
Expand Down Expand Up @@ -80,19 +83,15 @@ class DbtCliEventMessage:
raw_event (Dict[str, Any]): The raw event dictionary.
See https://docs.getdbt.com/reference/events-logging#structured-logging for more
information.
event_history_metadata (Dict[str, Any]): A dictionary of metadata about the
current event, gathered from previous historical events.
"""

raw_event: Dict[str, Any]
event_history_metadata: InitVar[Dict[str, Any]]

@classmethod
def from_log(cls, log: str) -> "DbtCliEventMessage":
"""Parse an event according to https://docs.getdbt.com/reference/events-logging#structured-logging.

We assume that the log format is json.
"""
raw_event: Dict[str, Any] = orjson.loads(log)

return cls(raw_event=raw_event)
def __post_init__(self, event_history_metadata: Dict[str, Any]):
self._event_history_metadata = event_history_metadata

def __str__(self) -> str:
return self.raw_event["info"]["msg"]
Expand Down Expand Up @@ -147,10 +146,16 @@ def to_default_asset_events(
"No dbt manifest was provided. Dagster events for dbt tests will not be created."
)

unique_id: str = event_node_info["unique_id"]
invocation_id: str = self.raw_event["info"]["invocation_id"]
dbt_resource_props = manifest["nodes"][unique_id]
default_metadata = {
**default_metadata_from_dbt_resource_props(self._event_history_metadata),
"unique_id": unique_id,
"invocation_id": invocation_id,
}
has_asset_def: bool = bool(context and context.has_assets_def)

invocation_id: str = self.raw_event["info"]["invocation_id"]
unique_id: str = event_node_info["unique_id"]
node_resource_type: str = event_node_info["resource_type"]
node_status: str = event_node_info["node_status"]
node_materialization: str = self.raw_event["data"]["node_info"]["materialized"]
Expand All @@ -172,8 +177,7 @@ def to_default_asset_events(
value=None,
output_name=dagster_name_fn(event_node_info),
metadata={
"unique_id": unique_id,
"invocation_id": invocation_id,
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
},
Expand All @@ -185,8 +189,7 @@ def to_default_asset_events(
yield AssetMaterialization(
asset_key=asset_key,
metadata={
"unique_id": unique_id,
"invocation_id": invocation_id,
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
},
Expand All @@ -195,8 +198,7 @@ def to_default_asset_events(
upstream_unique_ids: List[str] = manifest["parent_map"][unique_id]
test_resource_props = manifest["nodes"][unique_id]
metadata = {
"unique_id": unique_id,
"invocation_id": invocation_id,
**default_metadata,
"status": node_status,
**adapter_response_metadata,
}
Expand Down Expand Up @@ -460,9 +462,21 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
Returns:
Iterator[DbtCliEventMessage]: An iterator of events from the dbt CLI process.
"""
event_history_metadata_by_unique_id: Dict[str, Dict[str, Any]] = {}

for log in self._stdout or self._stream_stdout():
try:
event = DbtCliEventMessage.from_log(log=log)
raw_event: Dict[str, Any] = orjson.loads(log)
unique_id: Optional[str] = raw_event["data"].get("node_info", {}).get("unique_id")
event_history_metadata: Dict[str, Any] = {}
if unique_id and raw_event["info"]["name"] == "NodeFinished":
event_history_metadata = copy.deepcopy(
event_history_metadata_by_unique_id.get(unique_id, {})
)

event = DbtCliEventMessage(
raw_event=raw_event, event_history_metadata=event_history_metadata
)

is_error_message = event.log_level == "error"
is_debug_message = event.log_level == "debug"
Expand All @@ -472,6 +486,18 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
if is_error_message:
self._error_messages.append(str(event))

# Attempt to parse the columns metadata from the event message.
# If it exists, save it as historical metadata to attach to the NodeFinished event.
if event.raw_event["info"]["name"] == "JinjaLogInfo":
with contextlib.suppress(orjson.JSONDecodeError):
columns = orjson.loads(event.raw_event["info"]["msg"])
event_history_metadata_by_unique_id[cast(str, unique_id)] = {
"columns": columns
}

# Don't show this message in stdout
continue

# Only write debug logs to stdout if the user explicitly set
# the log level to debug.
if not is_debug_message or is_debug_user_log_level:
Expand Down
18 changes: 16 additions & 2 deletions python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
test_asset_key_exceptions_path,
test_dbt_alias_path,
test_dbt_python_interleaving_path,
test_jaffle_shop_path,
test_meta_config_path,
test_metadata_path,
)

# ======= CONFIG ========
Expand Down Expand Up @@ -94,12 +96,19 @@ def dbt_build(dbt_executable, dbt_config_dir):


def _create_dbt_manifest(project_dir: Path) -> Dict[str, Any]:
dbt = DbtCliResource(project_dir=os.fspath(project_dir))
dbt_invocation = dbt.cli(["--quiet", "compile"]).wait()
dbt = DbtCliResource(project_dir=os.fspath(project_dir), global_config_flags=["--quiet"])

dbt.cli(["deps"]).wait()
dbt_invocation = dbt.cli(["compile"]).wait()

return dbt_invocation.get_artifact("manifest.json")


@pytest.fixture(name="test_jaffle_shop_manifest", scope="session")
def test_jaffle_shop_manifest_fixture() -> Dict[str, Any]:
return _create_dbt_manifest(test_jaffle_shop_path)


@pytest.fixture(name="test_asset_checks_manifest", scope="session")
def test_asset_checks_manifest_fixture() -> Dict[str, Any]:
return _create_dbt_manifest(test_asset_checks_path)
Expand All @@ -123,3 +132,8 @@ def test_dbt_python_interleaving_manifest_fixture() -> Dict[str, Any]:
@pytest.fixture(name="test_meta_config_manifest", scope="session")
def test_meta_config_manifest_fixture() -> Dict[str, Any]:
return _create_dbt_manifest(test_meta_config_path)


@pytest.fixture(name="test_metadata_manifest", scope="session")
def test_metadata_manifest_fixture() -> Dict[str, Any]:
return _create_dbt_manifest(test_metadata_path)
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,9 @@ def test_no_default_asset_events_emitted(data: dict) -> None:
"invocation_id": "1-2-3",
},
"data": data,
}
).to_default_asset_events(manifest={})
},
event_history_metadata={},
).to_default_asset_events(manifest={"nodes": {"a.b.c": {}}})

assert list(asset_events) == []

Expand Down Expand Up @@ -531,7 +532,9 @@ def test_to_default_asset_output_events() -> None:
}

asset_events = list(
DbtCliEventMessage(raw_event=raw_event).to_default_asset_events(manifest=manifest)
DbtCliEventMessage(raw_event=raw_event, event_history_metadata={}).to_default_asset_events(
manifest=manifest
)
)

assert len(asset_events) == 1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os
from typing import Any, Dict, cast

from dagster import (
AssetExecutionContext,
Output,
TableColumn,
TableSchema,
materialize,
)
from dagster_dbt.asset_decorator import dbt_assets
from dagster_dbt.core.resources_v2 import DbtCliResource

from ..dbt_projects import test_jaffle_shop_path, test_metadata_path


def test_no_columns_metadata(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
@dbt_assets(manifest=test_jaffle_shop_manifest)
def assert_no_columns_metadata(context: AssetExecutionContext, dbt: DbtCliResource):
events = list(dbt.cli(["build"], context=context).stream())
output_by_dbt_unique_id: Dict[str, Output] = {
cast(str, dagster_event.metadata["unique_id"].value): dagster_event
for dagster_event in events
if isinstance(dagster_event, Output)
}

for output in output_by_dbt_unique_id.values():
assert "columns" not in output.metadata

yield from events

result = materialize(
[assert_no_columns_metadata],
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_jaffle_shop_path))},
)

assert result.success


def test_columns_metadata(test_metadata_manifest: Dict[str, Any]) -> None:
@dbt_assets(manifest=test_metadata_manifest)
def assert_columns_metadata(context: AssetExecutionContext, dbt: DbtCliResource):
events = list(dbt.cli(["build"], context=context).stream())
output_by_dbt_unique_id: Dict[str, Output] = {
cast(str, dagster_event.metadata["unique_id"].value): dagster_event
for dagster_event in events
if isinstance(dagster_event, Output)
}

for output in output_by_dbt_unique_id.values():
assert "columns" in output.metadata

customers_output = output_by_dbt_unique_id["model.test_dagster_metadata.customers"]
assert (
TableSchema(
columns=[
TableColumn("customer_id", type="INTEGER"),
TableColumn("first_name", type="character varying(256)"),
TableColumn("last_name", type="character varying(256)"),
TableColumn("first_order", type="DATE"),
TableColumn("most_recent_order", type="DATE"),
TableColumn("number_of_orders", type="BIGINT"),
TableColumn("customer_lifetime_value", type="DOUBLE"),
]
)
== customers_output.metadata["columns"].value
)

yield from events

result = materialize(
[assert_columns_metadata],
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_metadata_path))},
)

assert result.success
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

projects_path = Path(__file__).joinpath("..").resolve()

test_jaffle_shop_path = projects_path.joinpath("jaffle_shop")
test_asset_checks_path = projects_path.joinpath("test_dagster_asset_checks")
test_asset_key_exceptions_path = projects_path.joinpath("test_dagster_asset_key_exceptions")
test_dbt_alias_path = projects_path.joinpath("test_dagster_dbt_alias")
test_dbt_python_interleaving_path = projects_path.joinpath("test_dagster_dbt_python_interleaving")
test_exceptions_path = projects_path.joinpath("test_dagster_exceptions")
test_meta_config_path = projects_path.joinpath("test_dagster_meta_config")
test_metadata_path = projects_path.joinpath("test_dagster_metadata")
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ venv/
env/
**/*.duckdb
**/*.duckdb.wal
package-lock.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ clean-targets:
require-dbt-version: [">=1.0.0", "<2.0.0"]

models:
+post-hook:
- "{{ dagster.log_columns_in_relation() }}"
test_dagster_metadata:
materialized: table
staging:
materialized: view

seeds:
+post-hook:
- "{{ dagster.log_columns_in_relation() }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# We keep use `packages.yml` for compatability with `dbt-core==1.5.*`.
# Once we remove support for that version, we should rename this file to `dependencies.yml`
packages:
- local: "../../../dbt_packages/dagster"