Skip to content

Commit

Permalink
[RFC] feat(metadata): add table spec metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
rexledesma committed Mar 20, 2024
1 parent cc2ed8e commit 7f7c500
Show file tree
Hide file tree
Showing 16 changed files with 325 additions and 11 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
2 changes: 2 additions & 0 deletions docs/sphinx/sections/api/apidocs/ops.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ All metadata types inherit from `MetadataValue`. The following types are defined

.. autoclass:: TableSchemaMetadataValue

.. autoclass:: TableSpecMetadataValue

.. autoclass:: TextMetadataValue

.. autoclass:: TimestampMetadataValue
Expand Down
14 changes: 14 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from dagster._core.definitions.metadata import (
DagsterRunMetadataValue,
MetadataValue,
TableSpecMetadataValue,
)
from dagster._core.events import (
DagsterEventType,
Expand All @@ -36,6 +37,9 @@
from dagster._core.execution.plan.inputs import StepInputData
from dagster._core.execution.plan.outputs import StepOutputData

from dagster_graphql.schema.metadata import GrapheneTableSpecMetadataEntry
from dagster_graphql.schema.table import GrapheneTableSpec

MAX_INT = 2147483647
MIN_INT = -2147483648

Expand Down Expand Up @@ -165,6 +169,11 @@ def iterate_metadata_entries(metadata: Mapping[str, MetadataValue]) -> Iterator[
columns=value.schema.columns,
),
)
elif isinstance(value, TableSpecMetadataValue):
yield GrapheneTableSpecMetadataEntry(
label=key,
spec=GrapheneTableSpec(spec=value.table_spec),
)
elif isinstance(value, TimestampMetadataValue):
yield GrapheneTimestampMetadataEntry(label=key, timestamp=value.value)
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import graphene

from .asset_key import GrapheneAssetKey
from .table import GrapheneTable, GrapheneTableSchema
from .table import GrapheneTable, GrapheneTableSchema, GrapheneTableSpec


class GrapheneMetadataItemDefinition(graphene.ObjectType):
Expand Down Expand Up @@ -52,6 +52,14 @@ class Meta:
name = "TableSchemaMetadataEntry"


class GrapheneTableSpecMetadataEntry(graphene.ObjectType):
spec = graphene.NonNull(GrapheneTableSpec)

class Meta:
interfaces = (GrapheneMetadataEntry,)
name = "TableSpecMetadataEntry"


class GrapheneJsonMetadataEntry(graphene.ObjectType):
jsonString = graphene.NonNull(graphene.String)

Expand Down
27 changes: 27 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/schema/table.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import graphene

from .asset_key import GrapheneAssetKey
from .util import non_null_list


Expand Down Expand Up @@ -45,10 +46,36 @@ class Meta:
name = "Table"


class GrapheneTableColumnDep(graphene.ObjectType):
assetKey = graphene.NonNull(GrapheneAssetKey)
columnName = graphene.NonNull(graphene.String)

class Meta:
name = "TableColumnDep"


class GrapheneTableColumnSpec(graphene.ObjectType):
columnName = graphene.NonNull(graphene.String)
tableColumnDeps = non_null_list(GrapheneTableColumnDep)

class Meta:
name = "TableColumnSpec"


class GrapheneTableSpec(graphene.ObjectType):
columnSpecs = graphene.NonNull(GrapheneTableColumnSpec)

class Meta:
name = "TableSpec"


types = [
GrapheneTable,
GrapheneTableSchema,
GrapheneTableColumn,
GrapheneTableColumnConstraints,
GrapheneTableConstraints,
GrapheneTableColumnDep,
GrapheneTableColumnSpec,
GrapheneTableSpec,
]
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@
PythonArtifactMetadataValue as PythonArtifactMetadataValue,
TableMetadataValue as TableMetadataValue,
TableSchemaMetadataValue as TableSchemaMetadataValue,
TableSpecMetadataValue as TableSpecMetadataValue,
TextMetadataValue as TextMetadataValue,
TimestampMetadataValue as TimestampMetadataValue,
UrlMetadataValue as UrlMetadataValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
TableRecord as TableRecord,
TableSchema as TableSchema,
TableSchemaMetadataValue as TableSchemaMetadataValue,
TableSpecMetadataValue as TableSpecMetadataValue,
TextMetadataValue as TextMetadataValue,
UrlMetadataValue as UrlMetadataValue,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Expand All @@ -24,6 +23,7 @@
import dagster._check as check
import dagster._seven as seven
from dagster._annotations import PublicAttr, deprecated, deprecated_param, experimental, public
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.errors import DagsterInvalidMetadata
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import (
Expand All @@ -41,20 +41,19 @@
from .table import ( # re-exported
TableColumn as TableColumn,
TableColumnConstraints as TableColumnConstraints,
TableColumnSpec as TableColumnSpec,
TableConstraints as TableConstraints,
TableRecord as TableRecord,
TableSchema as TableSchema,
TableSpec,
)

if TYPE_CHECKING:
from dagster._core.definitions.events import AssetKey

ArbitraryMetadataMapping: TypeAlias = Mapping[str, Any]

RawMetadataValue = Union[
"MetadataValue",
TableSchema,
"AssetKey",
AssetKey,
os.PathLike,
Dict[Any, Any],
float,
Expand Down Expand Up @@ -109,9 +108,11 @@ def normalize_metadata(
return normalized_metadata


def normalize_metadata_value(raw_value: RawMetadataValue) -> "MetadataValue[Any]":
from dagster._core.definitions.events import AssetKey
def has_corresponding_metadata_value_class(obj: Any) -> bool:
return isinstance(obj, (str, float, bool, int, list, dict, os.PathLike, AssetKey, TableSchema))


def normalize_metadata_value(raw_value: RawMetadataValue) -> "MetadataValue[Any]":
if isinstance(raw_value, MetadataValue):
return raw_value
elif isinstance(raw_value, str):
Expand All @@ -130,6 +131,8 @@ def normalize_metadata_value(raw_value: RawMetadataValue) -> "MetadataValue[Any]
return MetadataValue.asset(raw_value)
elif isinstance(raw_value, TableSchema):
return MetadataValue.table_schema(raw_value)
elif isinstance(raw_value, TableSpec):
return MetadataValue.table_spec(raw_value)
elif raw_value is None:
return MetadataValue.null()

Expand Down Expand Up @@ -574,6 +577,20 @@ def table_schema(
"""
return TableSchemaMetadataValue(schema)

@public
@staticmethod
def table_spec(
spec: TableSpec,
) -> "TableSpecMetadataValue":
"""Static constructor for a metadata value wrapping a table spec as
:py:class:`TableSpecMetadataValue`. Can be used as the value type
for the `metadata` parameter for supported events.
Args:
spec (TableSpec): The table spec for a metadata entry.
"""
return TableSpecMetadataValue(spec)

@public
@staticmethod
def null() -> "NullMetadataValue":
Expand Down Expand Up @@ -1051,6 +1068,29 @@ def value(self) -> TableSchema:
return self.schema


@whitelist_for_serdes
class TableSpecMetadataValue(
NamedTuple("_TableSpecMetadataValue", [("table_spec", PublicAttr[TableSpec])]),
MetadataValue[TableSpec],
):
"""Representation of the specification of arbitrary tabular data.
Args:
table_spec (TableSchema): The dictionary containing the schema representation.
"""

def __new__(cls, table_spec: TableSpec):
return super(TableSpecMetadataValue, cls).__new__(
cls, check.inst_param(table_spec, "table_spec", TableSpec)
)

@public
@property
def value(self) -> TableSpec:
"""TableSpec: The wrapped :py:class:`TableSpec`."""
return self.table_spec


@whitelist_for_serdes(storage_name="NullMetadataEntryData")
class NullMetadataValue(NamedTuple("_NullMetadataValue", []), MetadataValue[None]):
"""Representation of null."""
Expand Down Expand Up @@ -1259,9 +1299,11 @@ class TableMetadataEntries(NamespacedMetadataEntries, frozen=True):
Args:
column_schema (Optional[TableSchema]): The schema of the columns in the table.
table_spec (Optional[TableSpec]): The specifications of the table.
"""

column_schema: Optional[TableSchema] = None
table_spec: Optional[TableSpec] = None

@classmethod
def namespace(cls) -> str:
Expand Down
Loading

0 comments on commit 7f7c500

Please sign in to comment.