Skip to content

Commit

Permalink
[external-assets] Add AssetNode to AssetGraph
Browse files Browse the repository at this point in the history
[INTERNAL_BRANCH=sean/external-assets-asset-graph-nodes-1]
  • Loading branch information
smackesey committed Mar 4, 2024
1 parent e344cab commit ca5d1cf
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import dagster._check as check
from dagster._core.definitions.asset_subset import ValidAssetSubset
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.metadata import ArbitraryMetadataMapping
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.instance import DynamicPartitionsStore
from dagster._core.selector.subset_selector import (
Expand Down Expand Up @@ -141,6 +142,11 @@ def is_external(self) -> bool:
def is_executable(self) -> bool:
...

@property
@abstractmethod
def metadata(self) -> ArbitraryMetadataMapping:
...

@property
@abstractmethod
def is_partitioned(self) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.metadata import ArbitraryMetadataMapping
from dagster._core.host_representation.external import ExternalRepository
from dagster._core.host_representation.handle import RepositoryHandle
from dagster._core.workspace.workspace import IWorkspace
Expand Down Expand Up @@ -80,6 +81,10 @@ def is_external(self) -> bool:
def is_executable(self) -> bool:
return any(node.is_executable for node in self._external_asset_nodes)

@property
def metadata(self) -> ArbitraryMetadataMapping:
return self._priority_node.metadata

@property
def is_partitioned(self) -> bool:
return self._priority_node.partitions_def_data is not None
Expand Down Expand Up @@ -345,7 +350,7 @@ def all_job_names(self) -> AbstractSet[str]:

@property
@cached_method
def repository_handles_by_key(self) -> Mapping[AssetKey, Sequence[RepositoryHandle]]:
def repository_handles_by_key(self) -> Mapping[AssetKey, RepositoryHandle]:
return {k: node.priority_repository_handle for k, node in self.asset_nodes_by_key.items()}

def get_repository_handle(self, asset_key: AssetKey) -> RepositoryHandle:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.metadata import ArbitraryMetadataMapping
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies
Expand Down Expand Up @@ -46,6 +47,10 @@ def is_external(self) -> bool:
def is_executable(self) -> bool:
return self.assets_def.is_executable

@property
def metadata(self) -> ArbitraryMetadataMapping:
return self.assets_def.metadata_by_key.get(self.key, {})

@property
def is_partitioned(self) -> bool:
return self.assets_def.partitions_def is not None
Expand Down

0 comments on commit ca5d1cf

Please sign in to comment.