diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz index 0f530d082f10b..ffa4be55b335d 100644 Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz index bdbf261c33366..f22ac03460cbb 100644 Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz index e13ef405b2122..5c66d6ed5983d 100644 Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv index 27058861fe82e..e1340ce95211c 100644 Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ diff --git a/docs/sphinx/sections/api/apidocs/ops.rst b/docs/sphinx/sections/api/apidocs/ops.rst index 0762ca43acd0a..7c00cbc46807b 100644 --- a/docs/sphinx/sections/api/apidocs/ops.rst +++ b/docs/sphinx/sections/api/apidocs/ops.rst @@ -101,6 +101,8 @@ All metadata types inherit from `MetadataValue`. The following types are defined .. autoclass:: TableSchemaMetadataValue +.. autoclass:: TableSpecMetadataValue + .. autoclass:: TextMetadataValue .. autoclass:: TimestampMetadataValue diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index fdd7f13b3dd42..08b0844b975a8 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -3107,6 +3107,20 @@ type TableConstraints { other: [String!]! } +type TableColumnDep { + assetKey: AssetKey! + columnName: String! +} + +type TableColumnSpec { + columnName: String! + tableColumnDeps: [TableColumnDep!]! +} + +type TableSpec { + columnSpecs: TableColumnSpec! +} + type PipelineTag { key: String! value: String! diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 4fba90061e377..08c9237889ff1 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -5133,6 +5133,18 @@ export type TableColumnConstraints = { unique: Scalars['Boolean']['output']; }; +export type TableColumnDep = { + __typename: 'TableColumnDep'; + assetKey: AssetKey; + columnName: Scalars['String']['output']; +}; + +export type TableColumnSpec = { + __typename: 'TableColumnSpec'; + columnName: Scalars['String']['output']; + tableColumnDeps: Array; +}; + export type TableConstraints = { __typename: 'TableConstraints'; other: Array; @@ -5158,6 +5170,11 @@ export type TableSchemaMetadataEntry = MetadataEntry & { schema: TableSchema; }; +export type TableSpec = { + __typename: 'TableSpec'; + columnSpecs: TableColumnSpec; +}; + export type Target = { __typename: 'Target'; mode: Scalars['String']['output']; @@ -14130,6 +14147,40 @@ export const buildTableColumnConstraints = ( }; }; +export const buildTableColumnDep = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'TableColumnDep'} & TableColumnDep => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('TableColumnDep'); + return { + __typename: 'TableColumnDep', + assetKey: + overrides && overrides.hasOwnProperty('assetKey') + ? overrides.assetKey! + : relationshipsToOmit.has('AssetKey') + ? ({} as AssetKey) + : buildAssetKey({}, relationshipsToOmit), + columnName: + overrides && overrides.hasOwnProperty('columnName') ? overrides.columnName! : 'vitae', + }; +}; + +export const buildTableColumnSpec = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'TableColumnSpec'} & TableColumnSpec => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('TableColumnSpec'); + return { + __typename: 'TableColumnSpec', + columnName: + overrides && overrides.hasOwnProperty('columnName') ? overrides.columnName! : 'quam', + tableColumnDeps: + overrides && overrides.hasOwnProperty('tableColumnDeps') ? overrides.tableColumnDeps! : [], + }; +}; + export const buildTableConstraints = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), @@ -14200,6 +14251,23 @@ export const buildTableSchemaMetadataEntry = ( }; }; +export const buildTableSpec = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'TableSpec'} & TableSpec => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('TableSpec'); + return { + __typename: 'TableSpec', + columnSpecs: + overrides && overrides.hasOwnProperty('columnSpecs') + ? overrides.columnSpecs! + : relationshipsToOmit.has('TableColumnSpec') + ? ({} as TableColumnSpec) + : buildTableColumnSpec({}, relationshipsToOmit), + }; +}; + export const buildTarget = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/events.py b/python_modules/dagster-graphql/dagster_graphql/implementation/events.py index 9a4819d38a74b..9598a97c9553d 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/events.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/events.py @@ -25,6 +25,7 @@ from dagster._core.definitions.metadata import ( DagsterRunMetadataValue, MetadataValue, + TableSpecMetadataValue, ) from dagster._core.events import ( DagsterEventType, @@ -36,6 +37,9 @@ from dagster._core.execution.plan.inputs import StepInputData from dagster._core.execution.plan.outputs import StepOutputData +from dagster_graphql.schema.metadata import GrapheneTableSpecMetadataEntry +from dagster_graphql.schema.table import GrapheneTableSpec + MAX_INT = 2147483647 MIN_INT = -2147483648 @@ -165,6 +169,11 @@ def iterate_metadata_entries(metadata: Mapping[str, MetadataValue]) -> Iterator[ columns=value.schema.columns, ), ) + elif isinstance(value, TableSpecMetadataValue): + yield GrapheneTableSpecMetadataEntry( + label=key, + spec=GrapheneTableSpec(spec=value.table_spec), + ) elif isinstance(value, TimestampMetadataValue): yield GrapheneTimestampMetadataEntry(label=key, timestamp=value.value) else: diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/metadata.py b/python_modules/dagster-graphql/dagster_graphql/schema/metadata.py index e577452f4bc60..746d2573fc221 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/metadata.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/metadata.py @@ -1,7 +1,7 @@ import graphene from .asset_key import GrapheneAssetKey -from .table import GrapheneTable, GrapheneTableSchema +from .table import GrapheneTable, GrapheneTableSchema, GrapheneTableSpec class GrapheneMetadataItemDefinition(graphene.ObjectType): @@ -52,6 +52,14 @@ class Meta: name = "TableSchemaMetadataEntry" +class GrapheneTableSpecMetadataEntry(graphene.ObjectType): + spec = graphene.NonNull(GrapheneTableSpec) + + class Meta: + interfaces = (GrapheneMetadataEntry,) + name = "TableSpecMetadataEntry" + + class GrapheneJsonMetadataEntry(graphene.ObjectType): jsonString = graphene.NonNull(graphene.String) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/table.py b/python_modules/dagster-graphql/dagster_graphql/schema/table.py index 3887a772e3a17..1a8114f220d29 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/table.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/table.py @@ -1,5 +1,6 @@ import graphene +from .asset_key import GrapheneAssetKey from .util import non_null_list @@ -45,10 +46,36 @@ class Meta: name = "Table" +class GrapheneTableColumnDep(graphene.ObjectType): + assetKey = graphene.NonNull(GrapheneAssetKey) + columnName = graphene.NonNull(graphene.String) + + class Meta: + name = "TableColumnDep" + + +class GrapheneTableColumnSpec(graphene.ObjectType): + columnName = graphene.NonNull(graphene.String) + tableColumnDeps = non_null_list(GrapheneTableColumnDep) + + class Meta: + name = "TableColumnSpec" + + +class GrapheneTableSpec(graphene.ObjectType): + columnSpecs = graphene.NonNull(GrapheneTableColumnSpec) + + class Meta: + name = "TableSpec" + + types = [ GrapheneTable, GrapheneTableSchema, GrapheneTableColumn, GrapheneTableColumnConstraints, GrapheneTableConstraints, + GrapheneTableColumnDep, + GrapheneTableColumnSpec, + GrapheneTableSpec, ] diff --git a/python_modules/dagster/dagster/__init__.py b/python_modules/dagster/dagster/__init__.py index 4887e93b6fd8d..8feab8acd9cbc 100644 --- a/python_modules/dagster/dagster/__init__.py +++ b/python_modules/dagster/dagster/__init__.py @@ -263,6 +263,7 @@ PythonArtifactMetadataValue as PythonArtifactMetadataValue, TableMetadataValue as TableMetadataValue, TableSchemaMetadataValue as TableSchemaMetadataValue, + TableSpecMetadataValue as TableSpecMetadataValue, TextMetadataValue as TextMetadataValue, TimestampMetadataValue as TimestampMetadataValue, UrlMetadataValue as UrlMetadataValue, diff --git a/python_modules/dagster/dagster/_core/definitions/__init__.py b/python_modules/dagster/dagster/_core/definitions/__init__.py index 1253ae5749969..167d10d8e4d92 100644 --- a/python_modules/dagster/dagster/_core/definitions/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/__init__.py @@ -67,6 +67,7 @@ TableRecord as TableRecord, TableSchema as TableSchema, TableSchemaMetadataValue as TableSchemaMetadataValue, + TableSpecMetadataValue as TableSpecMetadataValue, TextMetadataValue as TextMetadataValue, UrlMetadataValue as UrlMetadataValue, ) diff --git a/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py b/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py index 6ad3efc34497c..546d6c1c718f4 100644 --- a/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py @@ -2,7 +2,6 @@ from abc import ABC, abstractmethod from datetime import datetime from typing import ( - TYPE_CHECKING, AbstractSet, Any, Callable, @@ -24,6 +23,7 @@ import dagster._check as check import dagster._seven as seven from dagster._annotations import PublicAttr, deprecated, deprecated_param, experimental, public +from dagster._core.definitions.asset_key import AssetKey from dagster._core.errors import DagsterInvalidMetadata from dagster._serdes import whitelist_for_serdes from dagster._serdes.serdes import ( @@ -41,20 +41,19 @@ from .table import ( # re-exported TableColumn as TableColumn, TableColumnConstraints as TableColumnConstraints, + TableColumnSpec as TableColumnSpec, TableConstraints as TableConstraints, TableRecord as TableRecord, TableSchema as TableSchema, + TableSpec, ) -if TYPE_CHECKING: - from dagster._core.definitions.events import AssetKey - ArbitraryMetadataMapping: TypeAlias = Mapping[str, Any] RawMetadataValue = Union[ "MetadataValue", TableSchema, - "AssetKey", + AssetKey, os.PathLike, Dict[Any, Any], float, @@ -109,9 +108,11 @@ def normalize_metadata( return normalized_metadata -def normalize_metadata_value(raw_value: RawMetadataValue) -> "MetadataValue[Any]": - from dagster._core.definitions.events import AssetKey +def has_corresponding_metadata_value_class(obj: Any) -> bool: + return isinstance(obj, (str, float, bool, int, list, dict, os.PathLike, AssetKey, TableSchema)) + +def normalize_metadata_value(raw_value: RawMetadataValue) -> "MetadataValue[Any]": if isinstance(raw_value, MetadataValue): return raw_value elif isinstance(raw_value, str): @@ -130,6 +131,8 @@ def normalize_metadata_value(raw_value: RawMetadataValue) -> "MetadataValue[Any] return MetadataValue.asset(raw_value) elif isinstance(raw_value, TableSchema): return MetadataValue.table_schema(raw_value) + elif isinstance(raw_value, TableSpec): + return MetadataValue.table_spec(raw_value) elif raw_value is None: return MetadataValue.null() @@ -574,6 +577,20 @@ def table_schema( """ return TableSchemaMetadataValue(schema) + @public + @staticmethod + def table_spec( + spec: TableSpec, + ) -> "TableSpecMetadataValue": + """Static constructor for a metadata value wrapping a table spec as + :py:class:`TableSpecMetadataValue`. Can be used as the value type + for the `metadata` parameter for supported events. + + Args: + spec (TableSpec): The table spec for a metadata entry. + """ + return TableSpecMetadataValue(spec) + @public @staticmethod def null() -> "NullMetadataValue": @@ -1051,6 +1068,29 @@ def value(self) -> TableSchema: return self.schema +@whitelist_for_serdes +class TableSpecMetadataValue( + NamedTuple("_TableSpecMetadataValue", [("table_spec", PublicAttr[TableSpec])]), + MetadataValue[TableSpec], +): + """Representation of the specification of arbitrary tabular data. + + Args: + table_spec (TableSchema): The dictionary containing the schema representation. + """ + + def __new__(cls, table_spec: TableSpec): + return super(TableSpecMetadataValue, cls).__new__( + cls, check.inst_param(table_spec, "table_spec", TableSpec) + ) + + @public + @property + def value(self) -> TableSpec: + """TableSpec: The wrapped :py:class:`TableSpec`.""" + return self.table_spec + + @whitelist_for_serdes(storage_name="NullMetadataEntryData") class NullMetadataValue(NamedTuple("_NullMetadataValue", []), MetadataValue[None]): """Representation of null.""" @@ -1259,9 +1299,11 @@ class TableMetadataEntries(NamespacedMetadataEntries, frozen=True): Args: column_schema (Optional[TableSchema]): The schema of the columns in the table. + table_spec (Optional[TableSpec]): The specifications of the table. """ column_schema: Optional[TableSchema] = None + table_spec: Optional[TableSpec] = None @classmethod def namespace(cls) -> str: diff --git a/python_modules/dagster/dagster/_core/definitions/metadata/table.py b/python_modules/dagster/dagster/_core/definitions/metadata/table.py index cf2f60b78b8e9..e04c26f3dd3fa 100644 --- a/python_modules/dagster/dagster/_core/definitions/metadata/table.py +++ b/python_modules/dagster/dagster/_core/definitions/metadata/table.py @@ -2,6 +2,7 @@ import dagster._check as check from dagster._annotations import PublicAttr, experimental, public +from dagster._core.definitions.asset_key import AssetKey from dagster._serdes.serdes import ( whitelist_for_serdes, ) @@ -259,3 +260,76 @@ def __new__( _DEFAULT_TABLE_COLUMN_CONSTRAINTS = TableColumnConstraints() + + +# ########################### +# ##### TABLE COLUMN LINEAGE +# ########################### + + +@whitelist_for_serdes +class TableColumnDep( + NamedTuple( + "_TableColumnDep", + [ + ("asset_key", PublicAttr[AssetKey]), + ("column_name", PublicAttr[str]), + ], + ) +): + """Object representing an identifier for a column in an asset.""" + + def __new__( + cls, + asset_key: AssetKey, + column_name: str, + ): + return super(TableColumnDep, cls).__new__( + cls, + asset_key=check.inst_param(asset_key, "asset_key", AssetKey), + column_name=check.str_param(column_name, "column_name"), + ) + + +@whitelist_for_serdes +class TableColumnSpec( + NamedTuple( + "_TableColumnSpec", + [ + ("column_name", PublicAttr[str]), + ("table_column_deps", PublicAttr[Sequence[TableColumnDep]]), + ], + ) +): + """Represents the core attributes of a column for the current tabular asset.""" + + def __new__( + cls, + column_name: str, + table_column_deps: Sequence[TableColumnDep], + ): + return super(TableColumnSpec, cls).__new__( + cls, + column_name=check.str_param(column_name, "column_name"), + table_column_deps=check.list_param( + table_column_deps, "table_column_deps", TableColumnDep + ), + ) + + +@whitelist_for_serdes +class TableSpec( + NamedTuple( + "_TableSpec", + [ + ("column_specs", PublicAttr[Sequence[TableColumnSpec]]), + ], + ) +): + """Represents the core attributes of a tabular asset.""" + + def __new__(cls, column_specs: Sequence[TableColumnSpec]): + return super(TableSpec, cls).__new__( + cls, + column_specs=check.list_param(column_specs, "column_specs", TableColumnSpec), + ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/metadata_tests/test_table_metadata_entries.py b/python_modules/dagster/dagster_tests/definitions_tests/metadata_tests/test_table_metadata_entries.py index 459ea644f44f2..84dd3b892b066 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/metadata_tests/test_table_metadata_entries.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/metadata_tests/test_table_metadata_entries.py @@ -1,5 +1,8 @@ -from dagster import AssetMaterialization, TableColumn, TableSchema -from dagster._core.definitions.metadata import TableMetadataEntries +from dagster import AssetKey, AssetMaterialization, TableColumn, TableSchema +from dagster._core.definitions.metadata import ( + TableMetadataEntries, +) +from dagster._core.definitions.metadata.table import TableColumnDep, TableColumnSpec, TableSpec def test_table_metadata_entries(): @@ -18,3 +21,36 @@ def test_table_metadata_entries(): assert dict(TableMetadataEntries()) == {} assert TableMetadataEntries.extract(dict(TableMetadataEntries())) == TableMetadataEntries() + + +def test_column_specs() -> None: + expected_table_spec = TableSpec( + column_specs=[ + TableColumnSpec( + column_name="column", + table_column_deps=[ + TableColumnDep( + asset_key=AssetKey("upstream"), + column_name="upstream_column", + ) + ], + ) + ] + ) + expected_metadata = {"dagster/table_spec": expected_table_spec} + + table_metadata_entries = TableMetadataEntries(table_spec=expected_table_spec) + + dict_table_metadata_entries = dict(table_metadata_entries) + assert dict_table_metadata_entries == expected_metadata + + materialization = AssetMaterialization(asset_key="foo", metadata=dict_table_metadata_entries) + extracted_table_metadata_entries = TableMetadataEntries.extract(materialization.metadata) + assert extracted_table_metadata_entries.table_spec == expected_table_spec + + splat_table_metadata_entries = {**table_metadata_entries} + assert splat_table_metadata_entries == expected_metadata + + materialization = AssetMaterialization(asset_key="foo", metadata=splat_table_metadata_entries) + extracted_table_metadata_entries = TableMetadataEntries.extract(materialization.metadata) + assert extracted_table_metadata_entries.table_spec == expected_table_spec diff --git a/python_modules/dagster/dagster_tests/execution_tests/test_metadata.py b/python_modules/dagster/dagster_tests/execution_tests/test_metadata.py index df6e3b8d1f5b9..c31c2c17c07ec 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/test_metadata.py +++ b/python_modules/dagster/dagster_tests/execution_tests/test_metadata.py @@ -23,19 +23,24 @@ op, ) from dagster._check import CheckError +from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.decorators.asset_decorator import asset from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.metadata import ( DagsterInvalidMetadata, TableMetadataValue, + TableSpecMetadataValue, normalize_metadata, ) from dagster._core.definitions.metadata.table import ( TableColumn, TableColumnConstraints, + TableColumnDep, + TableColumnSpec, TableConstraints, TableRecord, TableSchema, + TableSpec, ) from dagster._core.execution.execution_result import ExecutionResult from dagster._core.snap.node import build_node_defs_snapshot @@ -63,6 +68,18 @@ def the_op(_context): "path": MetadataValue.path(Path("/a/b.csv")), "python": MetadataValue.python_artifact(MetadataValue), "timestamp": MetadataValue.timestamp(2000.5), + "table_spec": MetadataValue.table_spec( + TableSpec( + column_specs=[ + TableColumnSpec( + column_name="foo", + table_column_deps=[ + TableColumnDep(asset_key=AssetKey("bar"), column_name="baz") + ], + ) + ] + ) + ), }, ) @@ -80,7 +97,7 @@ def the_job(): ) assert len(materialization_events) == 1 materialization = materialization_events[0].event_specific_data.materialization - assert len(materialization.metadata) == 7 + assert len(materialization.metadata) == 8 entry_map = {k: v.__class__ for k, v in materialization.metadata.items()} assert entry_map["text"] == TextMetadataValue assert entry_map["int"] == IntMetadataValue @@ -89,6 +106,7 @@ def the_job(): assert entry_map["path"] == PathMetadataValue assert entry_map["python"] == PythonArtifactMetadataValue assert entry_map["timestamp"] == TimestampMetadataValue + assert entry_map["table_spec"] == TableSpecMetadataValue def test_metadata_asset_observation(): @@ -375,6 +393,20 @@ def test_table_serialization(): assert deserialize_value(serialized, TableMetadataValue) == table_metadata +def test_metadata_value_table_spec() -> None: + expected_table_spec = TableSpec( + column_specs=[ + TableColumnSpec( + column_name="foo", + table_column_deps=[TableColumnDep(asset_key=AssetKey("bar"), column_name="baz")], + ) + ] + ) + table_spec_metadata_value = MetadataValue.table_spec(expected_table_spec) + + assert table_spec_metadata_value.value == expected_table_spec + + def test_bool_metadata_value(): @op(out={}) def the_op():