Skip to content

Commit

Permalink
[1/p][dagster-embedded-etl] add SlingEventIterator (#23387)
Browse files Browse the repository at this point in the history
## Summary

Adds `SlingEventIterator` class which we can use to chain subsequent
computation on Sling syncs. Motivated by stacked PR #23388.

## Test Plan

Existing unit tests, inspect in editor to make sure type hints are
happy.
  • Loading branch information
benpankow authored Aug 6, 2024
1 parent 07b5902 commit c137304
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
AssetMaterialization,
ConfigurableResource,
EnvVar,
MaterializeResult,
OpExecutionContext,
PermissiveConfig,
get_dagster_logger,
Expand All @@ -33,6 +32,7 @@
streams_with_default_dagster_meta,
)
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator
from dagster_embedded_elt.sling.sling_event_iterator import SlingEventIterator
from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication

logger = get_dagster_logger()
Expand Down Expand Up @@ -429,7 +429,7 @@ def replicate(
replication_config: Optional[SlingReplicationParam] = None,
dagster_sling_translator: Optional[DagsterSlingTranslator] = None,
debug: bool = False,
) -> Generator[Union[MaterializeResult, AssetMaterialization], None, None]:
) -> SlingEventIterator[AssetMaterialization]:
"""Runs a Sling replication from the given replication config.
Args:
Expand All @@ -441,18 +441,36 @@ def replicate(
Returns:
Generator[Union[MaterializeResult, AssetMaterialization], None, None]: A generator of MaterializeResult or AssetMaterialization
"""
# attempt to retrieve params from asset context if not passed as a parameter
if not (replication_config or dagster_sling_translator):
metadata_by_key = context.assets_def.metadata_by_key
first_asset_metadata = next(iter(metadata_by_key.values()))
dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR)
replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG)

replication_config_dict = dict(validate_replication(replication_config))
return SlingEventIterator(
self._replicate(
context=context,
replication_config=replication_config_dict,
dagster_sling_translator=dagster_sling_translator,
debug=debug,
),
sling_cli=self,
replication_config=replication_config_dict,
)

def _replicate(
self,
*,
context: Union[OpExecutionContext, AssetExecutionContext],
replication_config: Dict[str, Any],
dagster_sling_translator: Optional[DagsterSlingTranslator],
debug: bool,
) -> Iterator[AssetMaterialization]:
# if translator has not been defined on metadata _or_ through param, then use the default constructor
dagster_sling_translator = dagster_sling_translator or DagsterSlingTranslator()

# convert to dict to enable updating the index
replication_config = dict(validate_replication(replication_config))
context_streams = self._get_replication_streams_for_context(context)
if context_streams:
replication_config.update({"streams": context_streams})
Expand Down Expand Up @@ -483,24 +501,20 @@ def replicate(
return_output=True,
env=env,
)
for row in results.split("\n"):
clean_line = self._clean_line(row)
sys.stdout.write(clean_line + "\n")
self._stdout.append(clean_line)
for row in results.split("\n"):
clean_line = self._clean_line(row)
sys.stdout.write(clean_line + "\n")
self._stdout.append(clean_line)

end_time = time.time()
end_time = time.time()

has_asset_def: bool = bool(context and context.has_assets_def)

for stream in stream_definition:
output_name = dagster_sling_translator.get_asset_key(stream)
if has_asset_def:
yield MaterializeResult(
asset_key=output_name, metadata={"elapsed_time": end_time - start_time}
)
else:
# TODO: In the future, it'd be nice to yield these materializations as they come in
# rather than waiting until the end of the replication
for stream in stream_definition:
output_name = dagster_sling_translator.get_asset_key(stream)
yield AssetMaterialization(
asset_key=output_name, metadata={"elapsed_time": end_time - start_time}
asset_key=output_name,
metadata={"elapsed_time": end_time - start_time, "stream_name": stream["name"]},
)

def stream_raw_logs(self) -> Generator[str, None, None]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from collections import abc
from typing import TYPE_CHECKING, Any, Dict, Generic, Iterator

from dagster import AssetMaterialization
from typing_extensions import TypeVar

if TYPE_CHECKING:
from .resources import SlingResource


SlingEventType = AssetMaterialization

# We define SlingEventIterator as a generic type for the sake of type hinting.
# This is so that users who inspect the type of the return value of `SlingResource.replicate()`
# will be able to see the inner type of the iterator, rather than just `SlingEventIterator`.
T = TypeVar("T", bound=SlingEventType)


class SlingEventIterator(Generic[T], abc.Iterator):
"""A wrapper around an iterator of Sling events which contains additional methods for
post-processing the events, such as fetching column metadata.
"""

def __init__(
self, events: Iterator[T], sling_cli: "SlingResource", replication_config: Dict[str, Any]
) -> None:
self._inner_iterator = events
self._sling_cli = sling_cli
self._replication_config = replication_config

def __next__(self) -> T:
return next(self._inner_iterator)

def __iter__(self) -> "SlingEventIterator[T]":
return self

0 comments on commit c137304

Please sign in to comment.