From a13f1b5fafee4948811dc185f5a0dc0f3866463d Mon Sep 17 00:00:00 2001 From: Rex Ledesma Date: Tue, 6 Feb 2024 12:19:46 -0500 Subject: [PATCH] feat(dbt): collect table schema metadata using the `dagster` dbt package --- .../dagster-dbt/dagster_dbt/asset_utils.py | 2 +- .../dagster_dbt/core/resources_v2.py | 54 ++++++++----- .../dagster-dbt/dagster_dbt_tests/conftest.py | 12 ++- .../core/test_resources_v2.py | 9 ++- .../dbt_packages/__init__.py | 0 .../dbt_packages/test_columns_metadata.py | 76 +++++++++++++++++++ .../dbt_projects/__init__.py | 1 + .../test_dagster_metadata/.gitignore | 1 + .../test_dagster_metadata/dbt_project.yml | 6 ++ .../test_dagster_metadata/dependencies.yml | 2 + 10 files changed, 138 insertions(+), 25 deletions(-) create mode 100644 python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_packages/__init__.py create mode 100644 python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_packages/test_columns_metadata.py create mode 100644 python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dependencies.yml diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py index ebe0ac3a9117f..6d61457865ede 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py @@ -362,7 +362,7 @@ def default_metadata_from_dbt_resource_props( metadata: Dict[str, Any] = {} columns = dbt_resource_props.get("columns", {}) if len(columns) > 0: - metadata["table_schema"] = MetadataValue.table_schema( + metadata["columns"] = MetadataValue.table_schema( TableSchema( columns=[ TableColumn( diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py index c02ae257145c6..b8dc011ad5fd4 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py @@ -1,3 +1,5 @@ +import contextlib +import copy import os import shutil import signal @@ -43,6 +45,7 @@ from typing_extensions import Literal from ..asset_utils import ( + default_metadata_from_dbt_resource_props, get_manifest_and_translator_from_dbt_assets, output_name_fn, ) @@ -80,19 +83,11 @@ class DbtCliEventMessage: raw_event (Dict[str, Any]): The raw event dictionary. See https://docs.getdbt.com/reference/events-logging#structured-logging for more information. + metadata (Dict[str, Any]): A dictionary of external metadata attached to the event. """ raw_event: Dict[str, Any] - - @classmethod - def from_log(cls, log: str) -> "DbtCliEventMessage": - """Parse an event according to https://docs.getdbt.com/reference/events-logging#structured-logging. - - We assume that the log format is json. - """ - raw_event: Dict[str, Any] = orjson.loads(log) - - return cls(raw_event=raw_event) + metadata: Dict[str, Any] def __str__(self) -> str: return self.raw_event["info"]["msg"] @@ -147,10 +142,16 @@ def to_default_asset_events( "No dbt manifest was provided. Dagster events for dbt tests will not be created." ) + unique_id: str = event_node_info["unique_id"] + invocation_id: str = self.raw_event["info"]["invocation_id"] + dbt_resource_props = manifest["nodes"][unique_id] + default_metadata = { + **default_metadata_from_dbt_resource_props(self.metadata), + "unique_id": unique_id, + "invocation_id": invocation_id, + } has_asset_def: bool = bool(context and context.has_assets_def) - invocation_id: str = self.raw_event["info"]["invocation_id"] - unique_id: str = event_node_info["unique_id"] node_resource_type: str = event_node_info["resource_type"] node_status: str = event_node_info["node_status"] node_materialization: str = self.raw_event["data"]["node_info"]["materialized"] @@ -172,8 +173,7 @@ def to_default_asset_events( value=None, output_name=output_name_fn(event_node_info), metadata={ - "unique_id": unique_id, - "invocation_id": invocation_id, + **default_metadata, "Execution Duration": duration_seconds, **adapter_response_metadata, }, @@ -185,8 +185,7 @@ def to_default_asset_events( yield AssetMaterialization( asset_key=asset_key, metadata={ - "unique_id": unique_id, - "invocation_id": invocation_id, + **default_metadata, "Execution Duration": duration_seconds, **adapter_response_metadata, }, @@ -195,8 +194,7 @@ def to_default_asset_events( upstream_unique_ids: List[str] = manifest["parent_map"][unique_id] test_resource_props = manifest["nodes"][unique_id] metadata = { - "unique_id": unique_id, - "invocation_id": invocation_id, + **default_metadata, "status": node_status, **adapter_response_metadata, } @@ -460,9 +458,17 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]: Returns: Iterator[DbtCliEventMessage]: An iterator of events from the dbt CLI process. """ + metadata_by_unique_id: Dict[str, Dict[str, Any]] = {} + for log in self._stdout or self._stream_stdout(): try: - event = DbtCliEventMessage.from_log(log=log) + raw_event: Dict[str, Any] = orjson.loads(log) + unique_id: Optional[str] = raw_event["data"].get("node_info", {}).get("unique_id") + metadata = ( + copy.deepcopy(metadata_by_unique_id.get(unique_id, {})) if unique_id else {} + ) + + event = DbtCliEventMessage(raw_event=raw_event, metadata=metadata) is_error_message = event.log_level == "error" is_debug_message = event.log_level == "debug" @@ -472,6 +478,16 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]: if is_error_message: self._error_messages.append(str(event)) + # Attempt to parse the columns metadata from the event message. + # If it exists, then save it as metadata for the dbt node in the manifest. + if event.raw_event["info"]["name"] == "JinjaLogInfo": + with contextlib.suppress(orjson.JSONDecodeError): + columns = orjson.loads(event.raw_event["info"]["msg"]) + metadata_by_unique_id[cast(str, unique_id)] = {"columns": columns} + + # Don't show this message in stdout + continue + # Only write debug logs to stdout if the user explicitly set # the log level to debug. if not is_debug_message or is_debug_user_log_level: diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py index 82fcfd43d5252..8d5fc14423344 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py @@ -12,6 +12,7 @@ test_asset_key_exceptions_path, test_dbt_python_interleaving_path, test_meta_config_path, + test_metadata_path, ) # ======= CONFIG ======== @@ -93,8 +94,10 @@ def dbt_build(dbt_executable, dbt_config_dir): def _create_dbt_manifest(project_dir: Path) -> Dict[str, Any]: - dbt = DbtCliResource(project_dir=os.fspath(project_dir)) - dbt_invocation = dbt.cli(["--quiet", "compile"]).wait() + dbt = DbtCliResource(project_dir=os.fspath(project_dir), global_config_flags=["--quiet"]) + + dbt.cli(["deps"]).wait() + dbt_invocation = dbt.cli(["compile"]).wait() return dbt_invocation.get_artifact("manifest.json") @@ -117,3 +120,8 @@ def test_dbt_python_interleaving_manifest_fixture() -> Dict[str, Any]: @pytest.fixture(name="test_meta_config_manifest", scope="session") def test_meta_config_manifest_fixture() -> Dict[str, Any]: return _create_dbt_manifest(test_meta_config_path) + + +@pytest.fixture(name="test_metadata_manifest", scope="session") +def test_metadata_manifest_fixture() -> Dict[str, Any]: + return _create_dbt_manifest(test_metadata_path) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py index 18e07ed58c4c5..d1c38a0f8346f 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py @@ -487,8 +487,9 @@ def test_no_default_asset_events_emitted(data: dict) -> None: "invocation_id": "1-2-3", }, "data": data, - } - ).to_default_asset_events(manifest={}) + }, + metadata={}, + ).to_default_asset_events(manifest={"nodes": {"a.b.c": {}}}) assert list(asset_events) == [] @@ -531,7 +532,9 @@ def test_to_default_asset_output_events() -> None: } asset_events = list( - DbtCliEventMessage(raw_event=raw_event).to_default_asset_events(manifest=manifest) + DbtCliEventMessage(raw_event=raw_event, metadata={}).to_default_asset_events( + manifest=manifest + ) ) assert len(asset_events) == 1 diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_packages/__init__.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_packages/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_packages/test_columns_metadata.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_packages/test_columns_metadata.py new file mode 100644 index 0000000000000..c30eeaa0e6f2a --- /dev/null +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_packages/test_columns_metadata.py @@ -0,0 +1,76 @@ +import os +from typing import Any, Dict, cast + +from dagster import ( + AssetExecutionContext, + Output, + TableColumn, + TableSchema, + materialize, +) +from dagster_dbt.asset_decorator import dbt_assets +from dagster_dbt.core.resources_v2 import DbtCliResource + +from ..dbt_projects import test_meta_config_path, test_metadata_path + + +def test_no_columns_metadata(test_meta_config_manifest: Dict[str, Any]) -> None: + @dbt_assets(manifest=test_meta_config_manifest) + def assert_no_columns_metadata(context: AssetExecutionContext, dbt: DbtCliResource): + events = list(dbt.cli(["build"], context=context).stream()) + output_by_dbt_unique_id: Dict[str, Output] = { + cast(str, dagster_event.metadata["unique_id"].value): dagster_event + for dagster_event in events + if isinstance(dagster_event, Output) + } + + for output in output_by_dbt_unique_id.values(): + assert "columns" not in output.metadata + + yield from events + + result = materialize( + [assert_no_columns_metadata], + resources={"dbt": DbtCliResource(project_dir=os.fspath(test_meta_config_path))}, + ) + + assert result.success + + +def test_columns_metadata(test_metadata_manifest: Dict[str, Any]) -> None: + @dbt_assets(manifest=test_metadata_manifest) + def assert_columns_metadata(context: AssetExecutionContext, dbt: DbtCliResource): + events = list(dbt.cli(["build"], context=context).stream()) + output_by_dbt_unique_id: Dict[str, Output] = { + cast(str, dagster_event.metadata["unique_id"].value): dagster_event + for dagster_event in events + if isinstance(dagster_event, Output) + } + + for output in output_by_dbt_unique_id.values(): + assert "columns" in output.metadata + + customers_output = output_by_dbt_unique_id["model.test_dagster_metadata.customers"] + assert ( + TableSchema( + columns=[ + TableColumn("customer_id", type="INTEGER"), + TableColumn("first_name", type="character varying(256)"), + TableColumn("last_name", type="character varying(256)"), + TableColumn("first_order", type="DATE"), + TableColumn("most_recent_order", type="DATE"), + TableColumn("number_of_orders", type="BIGINT"), + TableColumn("customer_lifetime_value", type="DOUBLE"), + ] + ) + == customers_output.metadata["columns"].value + ) + + yield from events + + result = materialize( + [assert_columns_metadata], + resources={"dbt": DbtCliResource(project_dir=os.fspath(test_metadata_path))}, + ) + + assert result.success diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/__init__.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/__init__.py index 3a61fbdfe624d..df512c2c08101 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/__init__.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/__init__.py @@ -7,3 +7,4 @@ test_dbt_python_interleaving_path = projects_path.joinpath("test_dagster_dbt_python_interleaving") test_exceptions_path = projects_path.joinpath("test_dagster_exceptions") test_meta_config_path = projects_path.joinpath("test_dagster_meta_config") +test_metadata_path = projects_path.joinpath("test_dagster_metadata") diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/.gitignore b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/.gitignore index ea8029fdf7ac0..f750fbd036c08 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/.gitignore +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/.gitignore @@ -9,3 +9,4 @@ venv/ env/ **/*.duckdb **/*.duckdb.wal +package-lock.yml diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml index 9f598429c0635..b6fe930b2d3cb 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml @@ -20,7 +20,13 @@ clean-targets: require-dbt-version: [">=1.0.0", "<2.0.0"] models: + +post-hook: + - "{{ dagster.log_columns_in_relation() }}" test_dagster_metadata: materialized: table staging: materialized: view + +seeds: + +post-hook: + - "{{ dagster.log_columns_in_relation() }}" diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dependencies.yml b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dependencies.yml new file mode 100644 index 0000000000000..7a6056b23a8ae --- /dev/null +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dependencies.yml @@ -0,0 +1,2 @@ +packages: + - local: "../../../dbt_packages/dagster"