Skip to content

Commit

Permalink
RFC: column lineage and pydantic metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Mar 14, 2024
1 parent e2d2d30 commit b245543
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Expand All @@ -15,13 +14,16 @@
Type,
Union,
cast,
get_args,
)

from pydantic import BaseModel
from typing_extensions import Self, TypeAlias, TypeVar

import dagster._check as check
import dagster._seven as seven
from dagster._annotations import PublicAttr, deprecated, deprecated_param, experimental, public
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.errors import DagsterInvalidMetadata
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import (
Expand All @@ -39,20 +41,18 @@
from .table import ( # re-exported
TableColumn as TableColumn,
TableColumnConstraints as TableColumnConstraints,
TableColumnLineages as TableColumnLineages,
TableConstraints as TableConstraints,
TableRecord as TableRecord,
TableSchema as TableSchema,
)

if TYPE_CHECKING:
from dagster._core.definitions.events import AssetKey

ArbitraryMetadataMapping: TypeAlias = Mapping[str, Any]

RawMetadataValue = Union[
"MetadataValue",
TableSchema,
"AssetKey",
AssetKey,
os.PathLike,
Dict[Any, Any],
float,
Expand Down Expand Up @@ -106,9 +106,11 @@ def normalize_metadata(
return normalized_metadata


def normalize_metadata_value(raw_value: RawMetadataValue) -> "MetadataValue[Any]":
from dagster._core.definitions.events import AssetKey
def has_corresponding_metadata_value_class(obj: Any) -> bool:
return isinstance(obj, (str, float, bool, int, list, dict, os.PathLike, AssetKey, TableSchema))


def normalize_metadata_value(raw_value: RawMetadataValue) -> "MetadataValue[Any]":
if isinstance(raw_value, MetadataValue):
return raw_value
elif isinstance(raw_value, str):
Expand Down Expand Up @@ -1173,7 +1175,18 @@ def keys(self) -> AbstractSet[str]:

def __getitem__(self, key: str) -> Any:
# getattr returns the pydantic property on the subclass
return getattr(self, self._strip_namespace_from_key(key))
value = getattr(self, self._strip_namespace_from_key(key))
if has_corresponding_metadata_value_class(value):
return value
elif isinstance(value, BaseModel):
return JsonMetadataValue(data=value.dict())
else:
raise DagsterInvalidMetadata("fdsjkfdls")

@classmethod
def _get_type_for_key(cls, key: str) -> Type:
annotation = cls.__fields__[key].annotation
return get_args(annotation)[0]

@classmethod
def extract(
Expand All @@ -1185,7 +1198,16 @@ def extract(
if len(splits) == 2:
namespace, key = splits
if namespace == cls.namespace() and key in cls.__fields__:
kwargs[key] = value.value if isinstance(value, MetadataValue) else value
if isinstance(value, JsonMetadataValue):
value_class = cls._get_type_for_key(key)
if issubclass(value_class, BaseModel):
kwargs[key] = value_class.parse_obj(value.data)
else:
kwargs[key] = value.data
elif isinstance(value, MetadataValue):
kwargs[key] = value.value
else:
kwargs[key] = value

return cls(**kwargs)

Expand All @@ -1199,6 +1221,7 @@ class TableMetadataEntries(NamespacedMetadataEntries, frozen=True):
"""

column_schema: Optional[TableSchema] = None
column_lineages: Optional[TableColumnLineages] = None

@classmethod
def namespace(cls) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import dagster._check as check
from dagster._annotations import PublicAttr, experimental, public
from dagster._core.events import AssetKey
from dagster._core.definitions.asset_key import AssetKey
from dagster._serdes.serdes import (
whitelist_for_serdes,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import pytest
from dagster import AssetMaterialization, TableColumn, TableSchema
from dagster._core.definitions.metadata import TableMetadataEntries
from dagster import AssetKey, AssetMaterialization, JsonMetadataValue, TableColumn, TableSchema
from dagster._core.definitions.metadata import (
TableMetadataEntries,
)
from dagster._core.definitions.metadata.table import AssetColumnDep, TableColumnLineages
from dagster._core.errors import DagsterInvalidMetadata


def test_table_metadata_entries():
def test_column_schema():
column_schema = TableSchema(columns=[TableColumn("foo", "str")])
table_metadata_entries = TableMetadataEntries(column_schema=column_schema)

Expand All @@ -24,3 +27,30 @@ def test_table_metadata_entries():

assert dict(TableMetadataEntries()) == {}
assert TableMetadataEntries.extract(dict(TableMetadataEntries())) == TableMetadataEntries()


def test_column_lineage():
column_lineages = TableColumnLineages(
column_lineages={"foo": [AssetColumnDep(asset_key=AssetKey("abc"), column_name="id")]}
)
table_metadata_entries = TableMetadataEntries(column_lineages=column_lineages)

dict_table_metadata_entries = dict(table_metadata_entries)
assert dict_table_metadata_entries == {
"dagster/column_lineages": JsonMetadataValue(column_lineages.dict())
}
assert isinstance(dict_table_metadata_entries["dagster/column_lineages"], JsonMetadataValue)
mat = AssetMaterialization(asset_key="a", metadata=dict_table_metadata_entries)
extracted = TableMetadataEntries.extract(mat.metadata).column_lineages
assert extracted == column_lineages
assert isinstance(extracted, TableColumnLineages)

splat_table_metadata_entries = {**table_metadata_entries}
assert splat_table_metadata_entries == {
"dagster/column_lineages": JsonMetadataValue(column_lineages.dict())
}
assert isinstance(splat_table_metadata_entries["dagster/column_lineages"], JsonMetadataValue)
mat = AssetMaterialization(asset_key="a", metadata=splat_table_metadata_entries)
extracted = TableMetadataEntries.extract(mat.metadata).column_lineages
assert extracted == column_lineages
assert isinstance(extracted, TableColumnLineages)

0 comments on commit b245543

Please sign in to comment.