Skip to content

Commit

Permalink
[dagster-fivetran] Implement FivetranEventIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Dec 10, 2024
1 parent 62c3877 commit 49c4665
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from collections import abc
from typing import TYPE_CHECKING, Generic, Iterator, Union

from dagster import AssetMaterialization, MaterializeResult
from typing_extensions import TypeVar

if TYPE_CHECKING:
from dagster_fivetran.resources import FivetranWorkspace


FivetranEventType = Union[AssetMaterialization, MaterializeResult]
T = TypeVar("T", bound=FivetranEventType)


class FivetranEventIterator(Generic[T], abc.Iterator):
"""A wrapper around an iterator of Fivetran events which contains additional methods for
post-processing the events, such as fetching column metadata.
"""

def __init__(
self,
events: Iterator[T],
fivetran_workspace: "FivetranWorkspace",
) -> None:
self._inner_iterator = events
self._fivetran_workspace = fivetran_workspace

def __next__(self) -> T:
return next(self._inner_iterator)

def __iter__(self) -> "FivetranEventIterator[T]":
return self
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException

from dagster_fivetran.fivetran_event_iterator import FivetranEventIterator
from dagster_fivetran.translator import (
DagsterFivetranTranslator,
FivetranConnector,
Expand Down Expand Up @@ -1025,9 +1026,13 @@ def sync_and_poll(
Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult
or AssetMaterialization.
"""
return FivetranEventIterator(
events=self._sync_and_poll(context=context), fivetran_workspace=self
)

def _sync_and_poll(self, context: Union[OpExecutionContext, AssetExecutionContext]):
assets_def = context.assets_def
dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def)

connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in assets_def.specs
Expand Down

0 comments on commit 49c4665

Please sign in to comment.