From 54b39a2595408ae723e77830919fc3219f268c70 Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Wed, 11 Dec 2024 08:10:02 -0800 Subject: [PATCH] [dagster-airlift] Standardize asset key interpolation (#26141) In response to recent product review where we decided to standardize the interpolation of asset keys. --- .../in_airflow/materialize_assets_operator.py | 14 ++++++++++++-- .../dags/dag.py | 8 +++++++- .../dagster_defs.py | 16 ++++++++++++++-- .../test_materialize_assets_operator.py | 8 +++++++- 4 files changed, 40 insertions(+), 6 deletions(-) diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift/in_airflow/materialize_assets_operator.py b/python_modules/libraries/dagster-airlift/dagster_airlift/in_airflow/materialize_assets_operator.py index 145df9af4e93a..9627b81438488 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift/in_airflow/materialize_assets_operator.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift/in_airflow/materialize_assets_operator.py @@ -1,9 +1,13 @@ +import re from typing import Any, Iterable, Mapping, Sequence, Union from airflow.utils.context import Context from dagster_airlift.in_airflow.base_asset_operator import BaseDagsterAssetsOperator +UNESCAPED_SLASH_RE = re.compile(r"(? tuple: + parts = re.split(UNESCAPED_SLASH_RE, path_str) + return tuple(part.replace(ESCAPED_SLASH, "/") for part in parts) diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift_tests/integration_tests/materialize_assets_operator_test_project/dags/dag.py b/python_modules/libraries/dagster-airlift/dagster_airlift_tests/integration_tests/materialize_assets_operator_test_project/dags/dag.py index ff52df064ef9d..ae8dc523724ca 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift_tests/integration_tests/materialize_assets_operator_test_project/dags/dag.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift_tests/integration_tests/materialize_assets_operator_test_project/dags/dag.py @@ -45,5 +45,11 @@ def get_dagster_url(self, context: Context) -> str: # Test both string syntax and list of strings syntax. task_id="some_task", dag=dag, - asset_key_paths=["some_asset", ["other_asset"], ["nested", "asset"]], + asset_key_paths=[ + "some_asset", + ["other_asset"], + ["nested", "asset"], + "string/interpretation", + r"backslash\/interpretation", + ], ) diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift_tests/integration_tests/materialize_assets_operator_test_project/dagster_defs.py b/python_modules/libraries/dagster-airlift/dagster_airlift_tests/integration_tests/materialize_assets_operator_test_project/dagster_defs.py index 31f45fd000c15..2db869d67c8c8 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift_tests/integration_tests/materialize_assets_operator_test_project/dagster_defs.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift_tests/integration_tests/materialize_assets_operator_test_project/dagster_defs.py @@ -1,4 +1,4 @@ -from dagster import Definitions, asset +from dagster import AssetKey, AssetOut, Definitions, asset, multi_asset @asset @@ -16,4 +16,16 @@ def nested_asset(): return "nested_asset_value" -defs = Definitions(assets=[some_asset, other_asset, nested_asset]) +@asset(key=["string", "interpretation"]) +def string_interpretation(): + return "string_interpretation_value" + + +@multi_asset(outs={"my_out": AssetOut(key=AssetKey(["backslash/interpretation"]))}) +def backslash_interpretation(): + return "backslash_interpretation_value" + + +defs = Definitions( + assets=[some_asset, other_asset, nested_asset, string_interpretation, backslash_interpretation] +) diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift_tests/integration_tests/test_materialize_assets_operator.py b/python_modules/libraries/dagster-airlift/dagster_airlift_tests/integration_tests/test_materialize_assets_operator.py index c36d252514421..89ca22a03920a 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift_tests/integration_tests/test_materialize_assets_operator.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift_tests/integration_tests/test_materialize_assets_operator.py @@ -59,7 +59,13 @@ def test_dagster_operator(airflow_instance: None, dagster_dev: None, dagster_hom run for run in runs if set(list(run.asset_selection)) # type: ignore - == {AssetKey(["some_asset"]), AssetKey(["other_asset"]), AssetKey(["nested", "asset"])} + == { + AssetKey(["some_asset"]), + AssetKey(["other_asset"]), + AssetKey(["nested", "asset"]), + AssetKey(["string", "interpretation"]), + AssetKey(["backslash/interpretation"]), + } ][0] assert the_run.status == DagsterRunStatus.SUCCESS