Skip to content

Commit

Permalink
Added filtering of disabled streams in asset decorator and added test…
Browse files Browse the repository at this point in the history
…s for disabled asset configs.
  • Loading branch information
PedramNavid committed Feb 25, 2024
1 parent 284a60c commit 62e3b11
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pyright/master/requirements-pinned.txt
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ pyarrow-hotfix==0.6
pyasn1==0.5.1
pyasn1-modules==0.3.0
pycparser==2.21
pydantic==2.6.0
pydantic==1.10.14
pydata-google-auth==1.8.2
pyflakes==3.2.0
Pygments==2.17.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ def get_streams_from_replication(
replication_config: Mapping[str, Any],
) -> Iterable[Mapping[str, Any]]:
"""Returns a list of streams and their configs from a Sling replication config."""
return [
{"name": stream, "config": config}
for stream, config in replication_config.get("streams", {}).items()
]
for stream, config in replication_config.get("streams", {}).items():
if config and config.get("disabled", False):
continue
yield {"name": stream, "config": config}


def sling_assets(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ streams:
sql: |
select all_user_id, name
from public."all_Users"
object: public.all_users # need to add 'object' key for custom SQL
object: public.all_users # need to add 'object' key for custom SQL
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sqlite3

import pytest
from dagster import AssetKey
from dagster import AssetKey, file_relative_path
from dagster._core.definitions.materialize import materialize
from dagster_embedded_elt.sling import (
SlingReplicationParam,
Expand Down Expand Up @@ -34,6 +34,26 @@ def my_sling_assets():
}


def test_disabled_asset():
@sling_assets(
replication_config=file_relative_path(
__file__, "replication_configs/base_config_disabled/replication.yaml"
)
)
def my_sling_assets():
...

assert my_sling_assets.keys == {
AssetKey.from_user_string(key)
for key in [
"target/public/accounts",
"target/departments",
"target/public/transactions",
"target/public/all_users",
]
}


def test_runs_base_sling_config(
csv_to_sqlite_replication_config: SlingReplicationParam,
path_to_test_csv: str,
Expand Down

0 comments on commit 62e3b11

Please sign in to comment.