Skip to content

Commit

Permalink
Move materialization logic to method
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 25, 2024
1 parent e6ac0ac commit 07b6dc5
Showing 1 changed file with 40 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -941,30 +941,18 @@ def load_asset_specs(
workspace=self, dagster_fivetran_translator=dagster_fivetran_translator.__class__
)

def sync_and_poll(
self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None
def _generate_materialization(
self,
fivetran_output: FivetranOutput,
dagster_fivetran_translator: DagsterFivetranTranslator,
):
# TODO: Add docstrings
assets_def = context.assets_def
dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def)

connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in assets_def.specs
)

client = self.get_client()
fivetran_output = client.sync_and_poll(
connector_id=connector_id,
)
connector = FivetranConnector.from_connector_details(
connector_details=fivetran_output.connector_details
)
schema_config = FivetranSchemaConfig.from_schema_config_details(
schema_config_details=fivetran_output.schema_config
)

materialized_asset_keys = set()
for schema_source_name, schema in schema_config.schemas.items():
if not schema.enabled:
continue
Expand All @@ -988,7 +976,7 @@ def sync_and_poll(
)
).key

materialization = AssetMaterialization(
yield AssetMaterialization(
asset_key=asset_key,
description=f"Table generated via Fivetran sync: {schema.name}.{table.name}",
metadata={
Expand All @@ -1004,23 +992,42 @@ def sync_and_poll(
"table_source_name": table_source_name,
},
)
if not materialization:
continue

# scan through all tables actually created, if it was expected then emit an Output.
# otherwise, emit a runtime AssetMaterialization
if materialization.asset_key in context.selected_asset_keys:
yield Output(
value=None,
output_name=materialization.asset_key.to_python_identifier(),
metadata=materialization.metadata,
)
materialized_asset_keys.add(materialization.asset_key)
else:
context.log.warning(
f"An unexpected asset was materialized: {materialization.asset_key}"
)
yield materialization
def sync_and_poll(
self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None
):
# TODO: Add docstrings
assets_def = context.assets_def
dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def)

connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in assets_def.specs
)

client = self.get_client()
fivetran_output = client.sync_and_poll(
connector_id=connector_id,
)

materialized_asset_keys = set()
for materialization in self._generate_materialization(
fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator
):
# scan through all tables actually created, if it was expected then emit an Output.
# otherwise, emit a runtime AssetMaterialization
if materialization.asset_key in context.selected_asset_keys:
yield Output(
value=None,
output_name=materialization.asset_key.to_python_identifier(),
metadata=materialization.metadata,
)
materialized_asset_keys.add(materialization.asset_key)
else:
context.log.warning(
f"An unexpected asset was materialized: {materialization.asset_key}"
)
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if unmaterialized_asset_keys:
Expand Down

0 comments on commit 07b6dc5

Please sign in to comment.