-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(dbt): collect table schema metadata using the
dagster
dbt package
- Loading branch information
1 parent
1473acc
commit b782f6a
Showing
10 changed files
with
148 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
76 changes: 76 additions & 0 deletions
76
python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_packages/test_columns_metadata.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
import os | ||
from typing import Any, Dict, cast | ||
|
||
from dagster import ( | ||
AssetExecutionContext, | ||
Output, | ||
TableColumn, | ||
TableSchema, | ||
materialize, | ||
) | ||
from dagster_dbt.asset_decorator import dbt_assets | ||
from dagster_dbt.core.resources_v2 import DbtCliResource | ||
|
||
from ..dbt_projects import test_meta_config_path, test_metadata_path | ||
|
||
|
||
def test_no_columns_metadata(test_meta_config_manifest: Dict[str, Any]) -> None: | ||
@dbt_assets(manifest=test_meta_config_manifest) | ||
def assert_no_columns_metadata(context: AssetExecutionContext, dbt: DbtCliResource): | ||
events = list(dbt.cli(["build"], context=context).stream()) | ||
output_by_dbt_unique_id: Dict[str, Output] = { | ||
cast(str, dagster_event.metadata["unique_id"].value): dagster_event | ||
for dagster_event in events | ||
if isinstance(dagster_event, Output) | ||
} | ||
|
||
for output in output_by_dbt_unique_id.values(): | ||
assert "columns" not in output.metadata | ||
|
||
yield from events | ||
|
||
result = materialize( | ||
[assert_no_columns_metadata], | ||
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_meta_config_path))}, | ||
) | ||
|
||
assert result.success | ||
|
||
|
||
def test_columns_metadata(test_metadata_manifest: Dict[str, Any]) -> None: | ||
@dbt_assets(manifest=test_metadata_manifest) | ||
def assert_columns_metadata(context: AssetExecutionContext, dbt: DbtCliResource): | ||
events = list(dbt.cli(["build"], context=context).stream()) | ||
output_by_dbt_unique_id: Dict[str, Output] = { | ||
cast(str, dagster_event.metadata["unique_id"].value): dagster_event | ||
for dagster_event in events | ||
if isinstance(dagster_event, Output) | ||
} | ||
|
||
for output in output_by_dbt_unique_id.values(): | ||
assert "columns" in output.metadata | ||
|
||
customers_output = output_by_dbt_unique_id["model.test_dagster_metadata.customers"] | ||
assert ( | ||
TableSchema( | ||
columns=[ | ||
TableColumn("customer_id", type="INTEGER"), | ||
TableColumn("first_name", type="character varying(256)"), | ||
TableColumn("last_name", type="character varying(256)"), | ||
TableColumn("first_order", type="DATE"), | ||
TableColumn("most_recent_order", type="DATE"), | ||
TableColumn("number_of_orders", type="BIGINT"), | ||
TableColumn("customer_lifetime_value", type="DOUBLE"), | ||
] | ||
) | ||
== customers_output.metadata["columns"].value | ||
) | ||
|
||
yield from events | ||
|
||
result = materialize( | ||
[assert_columns_metadata], | ||
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_metadata_path))}, | ||
) | ||
|
||
assert result.success |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,3 +9,4 @@ venv/ | |
env/ | ||
**/*.duckdb | ||
**/*.duckdb.wal | ||
package-lock.yml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 2 additions & 0 deletions
2
...braries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dependencies.yml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
packages: | ||
- local: "../../../dbt_packages/dagster" |