Skip to content

Commit

Permalink
[5/n][dagster-sling] Update callsites with DagsterSlingTranslator.get…
Browse files Browse the repository at this point in the history
…_asset_spec
  • Loading branch information
maximearmstrong committed Jan 16, 2025
1 parent c5ceba9 commit 47b221d
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def _get_replication_streams_for_context(
streams = streams_with_default_dagster_meta(raw_streams, replication_config)
selected_asset_keys = context.selected_asset_keys
for stream in streams:
asset_key = dagster_sling_translator.get_asset_key(stream)
asset_key = dagster_sling_translator.get_asset_spec(stream).key
if asset_key in selected_asset_keys:
context_streams.update({stream["name"]: stream["config"]})

Expand Down Expand Up @@ -401,7 +401,7 @@ def _replicate(
# TODO: In the future, it'd be nice to yield these materializations as they come in
# rather than waiting until the end of the replication
for stream in stream_definitions:
asset_key = dagster_sling_translator.get_asset_key(stream)
asset_key = dagster_sling_translator.get_asset_spec(stream).key

object_key = (stream.get("config") or {}).get("object")
destination_stream_name = object_key or stream["name"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,9 @@ class CustomSlingTranslator(DagsterSlingTranslator):
def get_asset_spec(self, stream_definition: Mapping[str, Any]) -> AssetSpec:
default_spec = super().get_asset_spec(stream_definition)
return default_spec.replace_attributes(
kinds=["sling", "foo"], tags={"custom_tag": "custom_value"}
kinds={"sling", "foo"}, tags={"custom_tag": "custom_value"}
)

def get_tags(self, stream_definition):
return {"custom_tag": "custom_value"}

def get_kinds(self, stream_definition):
return ["sling", "foo"]

@sling_assets(
replication_config=replication_config_path,
dagster_sling_translator=CustomSlingTranslator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ def test_sling_translator_sanitize(test, expected):
),
],
)
def test_get_asset_key(stream, expected):
def test_asset_key_from_get_asset_spec(stream, expected):
translator = DagsterSlingTranslator()
assert translator.get_asset_key(stream) == AssetKey.from_user_string(expected)
assert translator.get_asset_spec(stream).key == AssetKey.from_user_string(expected)


def test_get_asset_key_error():
def test_asset_key_from_get_asset_spec_error():
stream_definition = {"name": "foo", "config": {"meta": {"dagster": {"asset_key": "foo$bar"}}}}
translator = DagsterSlingTranslator()
with pytest.raises(ValueError):
translator.get_asset_key(stream_definition)
translator.get_asset_spec(stream_definition).key # noqa


@pytest.mark.parametrize(
Expand All @@ -55,16 +55,18 @@ def test_get_asset_key_error():
),
],
)
def test_get_deps_asset_key(stream, expected):
def test_deps_asset_key_from_get_asset_spec(stream, expected):
translator = DagsterSlingTranslator()
assert translator.get_deps_asset_key(stream) == [AssetKey.from_user_string(e) for e in expected]
assert [dep.asset_key for dep in translator.get_asset_spec(stream).deps] == [
AssetKey.from_user_string(e) for e in expected
]


def test_get_deps_asset_key_error():
def test_deps_asset_key_from_get_asset_spec_error():
stream_definition = {"name": "foo", "config": {"meta": {"dagster": {"deps": "foo$bar"}}}}
translator = DagsterSlingTranslator()
with pytest.raises(ValueError):
translator.get_deps_asset_key(stream_definition)
translator.get_asset_spec(stream_definition).deps # noqa


@pytest.mark.parametrize(
Expand All @@ -78,9 +80,9 @@ def test_get_deps_asset_key_error():
),
],
)
def test_get_description(stream, expected):
def test_description_from_get_asset_spec(stream, expected):
translator = DagsterSlingTranslator()
assert translator.get_description(stream) == expected
assert translator.get_asset_spec(stream).description == expected


@pytest.mark.parametrize(
Expand All @@ -93,10 +95,12 @@ def test_get_description(stream, expected):
),
],
)
def test_get_metadata(stream, expected):
def test_metadata_from_get_asset_spec(stream, expected):
translator = DagsterSlingTranslator()
stream = {"name": "foo", "config": {"foo": "bar"}}
assert translator.get_metadata(stream) == {"stream_config": JsonMetadataValue(stream["config"])}
assert translator.get_asset_spec(stream).metadata == {
"stream_config": JsonMetadataValue(stream["config"])
}


@pytest.mark.parametrize(
Expand All @@ -109,9 +113,9 @@ def test_get_metadata(stream, expected):
),
],
)
def test_get_group_name(stream, expected):
def test_group_name_from_get_asset_spec(stream, expected):
translator = DagsterSlingTranslator()
assert translator.get_group_name(stream) == expected
assert translator.get_asset_spec(stream).group_name == expected


@pytest.mark.parametrize(
Expand All @@ -127,9 +131,9 @@ def test_get_group_name(stream, expected):
),
],
)
def test_get_freshness_policy(stream, expected):
def test_freshness_policy_from_get_asset_spec(stream, expected):
translator = DagsterSlingTranslator()
assert translator.get_freshness_policy(stream) == expected
assert translator.get_asset_spec(stream).freshness_policy == expected


@pytest.mark.parametrize(
Expand All @@ -145,6 +149,6 @@ def test_get_freshness_policy(stream, expected):
),
],
)
def test_get_auto_materialize_policy(stream, expected):
def test_auto_materialize_policy_from_get_asset_spec(stream, expected):
translator = DagsterSlingTranslator()
assert translator.get_auto_materialize_policy(stream) == expected
assert translator.get_asset_spec(stream).auto_materialize_policy == expected

0 comments on commit 47b221d

Please sign in to comment.