Skip to content

Commit

Permalink
[RFC] feat(metadata): add table spec metadata (#20597)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Reopening #20477 under my name for review/approval purposes.

- Under `NamespacedMetadataEntries` (which needs to be unpluralized), add a metadata object to store information about observed table columns and those columns' dependencies.
- Allow this metadata object to be structured when served over GraphQL

## How I Tested These Changes
pytest, stacked on #20569
  • Loading branch information
rexledesma authored Mar 20, 2024
1 parent a31f7ff commit 559fd22
Show file tree
Hide file tree
Showing 16 changed files with 302 additions and 11 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
2 changes: 2 additions & 0 deletions docs/sphinx/sections/api/apidocs/ops.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ All metadata types inherit from `MetadataValue`. The following types are defined

.. autoclass:: PythonArtifactMetadataValue

.. autoclass:: TableColumnLineageMetadataValue

.. autoclass:: TableMetadataValue

.. autoclass:: TableSchemaMetadataValue
Expand Down
14 changes: 14 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 61 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from dagster._core.definitions.metadata import (
DagsterRunMetadataValue,
MetadataValue,
TableColumnLineageMetadataValue,
)
from dagster._core.events import (
DagsterEventType,
Expand All @@ -36,6 +37,8 @@
from dagster._core.execution.plan.inputs import StepInputData
from dagster._core.execution.plan.outputs import StepOutputData

from dagster_graphql.schema.metadata import GrapheneTableColumnLineageMetadataEntry

MAX_INT = 2147483647
MIN_INT = -2147483648

Expand Down Expand Up @@ -165,6 +168,11 @@ def iterate_metadata_entries(metadata: Mapping[str, MetadataValue]) -> Iterator[
columns=value.schema.columns,
),
)
elif isinstance(value, TableColumnLineageMetadataValue):
yield GrapheneTableColumnLineageMetadataEntry(
label=key,
lineage=value.column_lineage,
)
elif isinstance(value, TimestampMetadataValue):
yield GrapheneTimestampMetadataEntry(label=key, timestamp=value.value)
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import graphene

from .asset_key import GrapheneAssetKey
from .table import GrapheneTable, GrapheneTableSchema
from .table import GrapheneTable, GrapheneTableColumnLineage, GrapheneTableSchema


class GrapheneMetadataItemDefinition(graphene.ObjectType):
Expand Down Expand Up @@ -52,6 +52,14 @@ class Meta:
name = "TableSchemaMetadataEntry"


class GrapheneTableColumnLineageMetadataEntry(graphene.ObjectType):
lineage = graphene.NonNull(GrapheneTableColumnLineage)

class Meta:
interfaces = (GrapheneMetadataEntry,)
name = "TableColumnLineageMetadataEntry"


class GrapheneJsonMetadataEntry(graphene.ObjectType):
jsonString = graphene.NonNull(graphene.String)

Expand Down
42 changes: 42 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/schema/table.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import graphene
from dagster._core.definitions.metadata.table import TableColumnLineage

from .asset_key import GrapheneAssetKey
from .util import non_null_list


Expand Down Expand Up @@ -45,10 +47,50 @@ class Meta:
name = "Table"


class GrapheneTableColumnDep(graphene.ObjectType):
assetKey = graphene.NonNull(GrapheneAssetKey)
columnName = graphene.NonNull(graphene.String)

class Meta:
name = "TableColumnDep"


class GrapheneTableColumnLineageEntry(graphene.ObjectType):
columnName = graphene.NonNull(graphene.String)
columnDeps = non_null_list(GrapheneTableColumnDep)

class Meta:
name = "TableColumnLineageEntry"


class GrapheneTableColumnLineage(graphene.ObjectType):
entries = non_null_list(GrapheneTableColumnLineageEntry)

class Meta:
name = "TableColumnLineage"

def __init__(self, column_lineage: TableColumnLineage):
super().__init__(
entries=[
GrapheneTableColumnLineageEntry(
column_name,
[
GrapheneTableColumnDep(column_dep.asset_key, column_dep.column_name)
for column_dep in column_deps
],
)
for column_name, column_deps in column_lineage.deps_by_column.items()
]
)


types = [
GrapheneTable,
GrapheneTableSchema,
GrapheneTableColumn,
GrapheneTableColumnConstraints,
GrapheneTableConstraints,
GrapheneTableColumnDep,
GrapheneTableColumnLineageEntry,
GrapheneTableColumnLineage,
]
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@
NullMetadataValue as NullMetadataValue,
PathMetadataValue as PathMetadataValue,
PythonArtifactMetadataValue as PythonArtifactMetadataValue,
TableColumnLineageMetadataValue as TableColumnLineageMetadataValue,
TableMetadataValue as TableMetadataValue,
TableSchemaMetadataValue as TableSchemaMetadataValue,
TextMetadataValue as TextMetadataValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
PythonArtifactMetadataValue as PythonArtifactMetadataValue,
TableColumn as TableColumn,
TableColumnConstraints as TableColumnConstraints,
TableColumnLineageMetadataValue as TableColumnLineageMetadataValue,
TableConstraints as TableConstraints,
TableMetadataValue as TableMetadataValue,
TableRecord as TableRecord,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Expand All @@ -24,6 +23,7 @@
import dagster._check as check
import dagster._seven as seven
from dagster._annotations import PublicAttr, deprecated, deprecated_param, experimental, public
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.errors import DagsterInvalidMetadata
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import (
Expand All @@ -41,20 +41,18 @@
from .table import ( # re-exported
TableColumn as TableColumn,
TableColumnConstraints as TableColumnConstraints,
TableColumnLineage,
TableConstraints as TableConstraints,
TableRecord as TableRecord,
TableSchema as TableSchema,
)

if TYPE_CHECKING:
from dagster._core.definitions.events import AssetKey

ArbitraryMetadataMapping: TypeAlias = Mapping[str, Any]

RawMetadataValue = Union[
"MetadataValue",
TableSchema,
"AssetKey",
AssetKey,
os.PathLike,
Dict[Any, Any],
float,
Expand Down Expand Up @@ -109,9 +107,11 @@ def normalize_metadata(
return normalized_metadata


def normalize_metadata_value(raw_value: RawMetadataValue) -> "MetadataValue[Any]":
from dagster._core.definitions.events import AssetKey
def has_corresponding_metadata_value_class(obj: Any) -> bool:
return isinstance(obj, (str, float, bool, int, list, dict, os.PathLike, AssetKey, TableSchema))


def normalize_metadata_value(raw_value: RawMetadataValue) -> "MetadataValue[Any]":
if isinstance(raw_value, MetadataValue):
return raw_value
elif isinstance(raw_value, str):
Expand All @@ -130,6 +130,8 @@ def normalize_metadata_value(raw_value: RawMetadataValue) -> "MetadataValue[Any]
return MetadataValue.asset(raw_value)
elif isinstance(raw_value, TableSchema):
return MetadataValue.table_schema(raw_value)
elif isinstance(raw_value, TableColumnLineage):
return MetadataValue.column_lineage(raw_value)
elif raw_value is None:
return MetadataValue.null()

Expand Down Expand Up @@ -574,6 +576,20 @@ def table_schema(
"""
return TableSchemaMetadataValue(schema)

@public
@staticmethod
def column_lineage(
lineage: TableColumnLineage,
) -> "TableColumnLineageMetadataValue":
"""Static constructor for a metadata value wrapping a column lineage as
:py:class:`TableColumnLineageMetadataValue`. Can be used as the value type
for the `metadata` parameter for supported events.
Args:
lineage (TableColumnLineage): The column lineage for a metadata entry.
"""
return TableColumnLineageMetadataValue(lineage)

@public
@staticmethod
def null() -> "NullMetadataValue":
Expand Down Expand Up @@ -1051,6 +1067,32 @@ def value(self) -> TableSchema:
return self.schema


@whitelist_for_serdes
class TableColumnLineageMetadataValue(
NamedTuple(
"_TableColumnLineageMetadataValue", [("column_lineage", PublicAttr[TableColumnLineage])]
),
MetadataValue[TableColumnLineage],
):
"""Representation of the lineage of column inputs to column outputs of arbitrary tabular data.
Args:
column_lineage (TableColumnLineage): The lineage of column inputs to column outputs
for the table.
"""

def __new__(cls, column_lineage: TableColumnLineage):
return super(TableColumnLineageMetadataValue, cls).__new__(
cls, check.inst_param(column_lineage, "column_lineage", TableColumnLineage)
)

@public
@property
def value(self) -> TableColumnLineage:
"""TableSpec: The wrapped :py:class:`TableSpec`."""
return self.column_lineage


@whitelist_for_serdes(storage_name="NullMetadataEntryData")
class NullMetadataValue(NamedTuple("_NullMetadataValue", []), MetadataValue[None]):
"""Representation of null."""
Expand Down Expand Up @@ -1259,9 +1301,12 @@ class TableMetadataEntries(NamespacedMetadataEntries, frozen=True):
Args:
column_schema (Optional[TableSchema]): The schema of the columns in the table.
column_lineage (Optional[TableColumnLineage]): The lineage of column inputs to column
outputs for the table.
"""

column_schema: Optional[TableSchema] = None
column_lineage: Optional[TableColumnLineage] = None

@classmethod
def namespace(cls) -> str:
Expand Down
Loading

2 comments on commit 559fd22

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-dqhf04ccl-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit 559fd22.
This pull request is being automatically deployed with vercel-action

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-nn4s39k70-elementl.vercel.app

Built with commit 559fd22.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.