Skip to content

Commit

Permalink
[dagster-airlift] Standardize asset key interpolation (#26141)
Browse files Browse the repository at this point in the history
In response to recent product review where we decided to standardize the
interpolation of asset keys.
  • Loading branch information
dpeng817 authored Dec 11, 2024
1 parent 5d9eee0 commit 54b39a2
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import re
from typing import Any, Iterable, Mapping, Sequence, Union

from airflow.utils.context import Context

from dagster_airlift.in_airflow.base_asset_operator import BaseDagsterAssetsOperator

UNESCAPED_SLASH_RE = re.compile(r"(?<!\\)/")
ESCAPED_SLASH = "\\/"


class BaseMaterializeAssetsOperator(BaseDagsterAssetsOperator):
"""An operator base class that proxies execution to a user-provided list of Dagster assets.
Expand All @@ -12,13 +16,14 @@ class BaseMaterializeAssetsOperator(BaseDagsterAssetsOperator):
Args:
asset_key_paths (Sequence[Union[str, Sequence[str]]]): A sequence of asset key paths to materialize.
Each path in the sequence can be a string, which is treated as an asset key path with a single
component, or a sequence of strings representing a path with multiple components. For more,
component per "/" key, or a sequence of strings representing a path with multiple components. For more,
see the docs on asset keys: https://docs.dagster.io/concepts/assets/software-defined-assets#multi-component-asset-keys
"""

def __init__(self, asset_key_paths: Sequence[Union[str, Sequence[str]]], *args, **kwargs):
self.asset_key_paths = [
(path,) if isinstance(path, str) else tuple(path) for path in asset_key_paths
_get_path_from_str(path) if isinstance(path, str) else tuple(path)
for path in asset_key_paths
]
super().__init__(*args, **kwargs)

Expand All @@ -31,3 +36,8 @@ def filter_asset_nodes(
f"Could not find all asset key paths {self.asset_key_paths} in the asset nodes. Found: {list(hashable_path_to_node.keys())}"
)
yield from [hashable_path_to_node[path] for path in self.asset_key_paths]


def _get_path_from_str(path_str: str) -> tuple:
parts = re.split(UNESCAPED_SLASH_RE, path_str)
return tuple(part.replace(ESCAPED_SLASH, "/") for part in parts)
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,11 @@ def get_dagster_url(self, context: Context) -> str:
# Test both string syntax and list of strings syntax.
task_id="some_task",
dag=dag,
asset_key_paths=["some_asset", ["other_asset"], ["nested", "asset"]],
asset_key_paths=[
"some_asset",
["other_asset"],
["nested", "asset"],
"string/interpretation",
r"backslash\/interpretation",
],
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import Definitions, asset
from dagster import AssetKey, AssetOut, Definitions, asset, multi_asset


@asset
Expand All @@ -16,4 +16,16 @@ def nested_asset():
return "nested_asset_value"


defs = Definitions(assets=[some_asset, other_asset, nested_asset])
@asset(key=["string", "interpretation"])
def string_interpretation():
return "string_interpretation_value"


@multi_asset(outs={"my_out": AssetOut(key=AssetKey(["backslash/interpretation"]))})
def backslash_interpretation():
return "backslash_interpretation_value"


defs = Definitions(
assets=[some_asset, other_asset, nested_asset, string_interpretation, backslash_interpretation]
)
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ def test_dagster_operator(airflow_instance: None, dagster_dev: None, dagster_hom
run
for run in runs
if set(list(run.asset_selection)) # type: ignore
== {AssetKey(["some_asset"]), AssetKey(["other_asset"]), AssetKey(["nested", "asset"])}
== {
AssetKey(["some_asset"]),
AssetKey(["other_asset"]),
AssetKey(["nested", "asset"]),
AssetKey(["string", "interpretation"]),
AssetKey(["backslash/interpretation"]),
}
][0]
assert the_run.status == DagsterRunStatus.SUCCESS

Expand Down

0 comments on commit 54b39a2

Please sign in to comment.