diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index 523c3dde72029..b389e889601b7 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -16,7 +16,6 @@ AssetMaterialization, ConfigurableResource, EnvVar, - MaterializeResult, OpExecutionContext, PermissiveConfig, get_dagster_logger, @@ -33,6 +32,7 @@ streams_with_default_dagster_meta, ) from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator +from dagster_embedded_elt.sling.sling_event_iterator import SlingEventIterator from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication logger = get_dagster_logger() @@ -429,7 +429,7 @@ def replicate( replication_config: Optional[SlingReplicationParam] = None, dagster_sling_translator: Optional[DagsterSlingTranslator] = None, debug: bool = False, - ) -> Generator[Union[MaterializeResult, AssetMaterialization], None, None]: + ) -> SlingEventIterator[AssetMaterialization]: """Runs a Sling replication from the given replication config. Args: @@ -441,18 +441,36 @@ def replicate( Returns: Generator[Union[MaterializeResult, AssetMaterialization], None, None]: A generator of MaterializeResult or AssetMaterialization """ - # attempt to retrieve params from asset context if not passed as a parameter if not (replication_config or dagster_sling_translator): metadata_by_key = context.assets_def.metadata_by_key first_asset_metadata = next(iter(metadata_by_key.values())) dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR) replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG) + replication_config_dict = dict(validate_replication(replication_config)) + return SlingEventIterator( + self._replicate( + context=context, + replication_config=replication_config_dict, + dagster_sling_translator=dagster_sling_translator, + debug=debug, + ), + sling_cli=self, + replication_config=replication_config_dict, + ) + + def _replicate( + self, + *, + context: Union[OpExecutionContext, AssetExecutionContext], + replication_config: Dict[str, Any], + dagster_sling_translator: Optional[DagsterSlingTranslator], + debug: bool, + ) -> Iterator[AssetMaterialization]: # if translator has not been defined on metadata _or_ through param, then use the default constructor dagster_sling_translator = dagster_sling_translator or DagsterSlingTranslator() # convert to dict to enable updating the index - replication_config = dict(validate_replication(replication_config)) context_streams = self._get_replication_streams_for_context(context) if context_streams: replication_config.update({"streams": context_streams}) @@ -483,24 +501,20 @@ def replicate( return_output=True, env=env, ) - for row in results.split("\n"): - clean_line = self._clean_line(row) - sys.stdout.write(clean_line + "\n") - self._stdout.append(clean_line) + for row in results.split("\n"): + clean_line = self._clean_line(row) + sys.stdout.write(clean_line + "\n") + self._stdout.append(clean_line) - end_time = time.time() + end_time = time.time() - has_asset_def: bool = bool(context and context.has_assets_def) - - for stream in stream_definition: - output_name = dagster_sling_translator.get_asset_key(stream) - if has_asset_def: - yield MaterializeResult( - asset_key=output_name, metadata={"elapsed_time": end_time - start_time} - ) - else: + # TODO: In the future, it'd be nice to yield these materializations as they come in + # rather than waiting until the end of the replication + for stream in stream_definition: + output_name = dagster_sling_translator.get_asset_key(stream) yield AssetMaterialization( - asset_key=output_name, metadata={"elapsed_time": end_time - start_time} + asset_key=output_name, + metadata={"elapsed_time": end_time - start_time, "stream_name": stream["name"]}, ) def stream_raw_logs(self) -> Generator[str, None, None]: diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/sling_event_iterator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/sling_event_iterator.py new file mode 100644 index 0000000000000..caaf77e0628aa --- /dev/null +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/sling_event_iterator.py @@ -0,0 +1,35 @@ +from collections import abc +from typing import TYPE_CHECKING, Any, Dict, Generic, Iterator + +from dagster import AssetMaterialization +from typing_extensions import TypeVar + +if TYPE_CHECKING: + from .resources import SlingResource + + +SlingEventType = AssetMaterialization + +# We define SlingEventIterator as a generic type for the sake of type hinting. +# This is so that users who inspect the type of the return value of `SlingResource.replicate()` +# will be able to see the inner type of the iterator, rather than just `SlingEventIterator`. +T = TypeVar("T", bound=SlingEventType) + + +class SlingEventIterator(Generic[T], abc.Iterator): + """A wrapper around an iterator of Sling events which contains additional methods for + post-processing the events, such as fetching column metadata. + """ + + def __init__( + self, events: Iterator[T], sling_cli: "SlingResource", replication_config: Dict[str, Any] + ) -> None: + self._inner_iterator = events + self._sling_cli = sling_cli + self._replication_config = replication_config + + def __next__(self) -> T: + return next(self._inner_iterator) + + def __iter__(self) -> "SlingEventIterator[T]": + return self