Skip to content

Commit

Permalink
migrate examples to AssetExecutionContext (#14792)
Browse files Browse the repository at this point in the history
follow-up to #14760 for examples that require `AssetExecutionContext` to
be published first

## How I Tested These Changes

bk
#14770
  • Loading branch information
alangenfeld authored Jun 23, 2023
1 parent c0392b3 commit 553c06d
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 24 deletions.
6 changes: 3 additions & 3 deletions examples/quickstart_aws/quickstart_aws/assets/hackernews.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import matplotlib.pyplot as plt
import pandas as pd
import requests
from dagster import MetadataValue, OpExecutionContext, asset
from dagster import AssetExecutionContext, MetadataValue, asset
from dagster_aws.s3 import S3Resource
from wordcloud import STOPWORDS, WordCloud

Expand All @@ -23,7 +23,7 @@ def hackernews_topstory_ids() -> pd.DataFrame:

@asset(group_name="hackernews", compute_kind="HackerNews API")
def hackernews_topstories(
context: OpExecutionContext, hackernews_topstory_ids: pd.DataFrame
context: AssetExecutionContext, hackernews_topstory_ids: pd.DataFrame
) -> pd.DataFrame:
"""Get items based on story ids from the HackerNews items endpoint. It may take 1-2 minutes to fetch all 500 items.
Expand Down Expand Up @@ -53,7 +53,7 @@ def hackernews_topstories(

@asset(group_name="hackernews", compute_kind="Plot")
def hackernews_topstories_word_cloud(
context: OpExecutionContext,
context: AssetExecutionContext,
s3: S3Resource,
hackernews_topstories: pd.DataFrame,
) -> None:
Expand Down
6 changes: 3 additions & 3 deletions examples/quickstart_etl/quickstart_etl/assets/hackernews.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import matplotlib.pyplot as plt
import pandas as pd
import requests
from dagster import MetadataValue, OpExecutionContext, asset
from dagster import AssetExecutionContext, MetadataValue, asset
from wordcloud import STOPWORDS, WordCloud


Expand All @@ -22,7 +22,7 @@ def hackernews_topstory_ids() -> List[int]:

@asset(group_name="hackernews", compute_kind="HackerNews API")
def hackernews_topstories(
context: OpExecutionContext, hackernews_topstory_ids: List[int]
context: AssetExecutionContext, hackernews_topstory_ids: List[int]
) -> pd.DataFrame:
"""Get items based on story ids from the HackerNews items endpoint. It may take 1-2 minutes to fetch all 500 items.
Expand Down Expand Up @@ -52,7 +52,7 @@ def hackernews_topstories(

@asset(group_name="hackernews", compute_kind="Plot")
def hackernews_topstories_word_cloud(
context: OpExecutionContext, hackernews_topstories: pd.DataFrame
context: AssetExecutionContext, hackernews_topstories: pd.DataFrame
) -> bytes:
"""Exploratory analysis: Generate a word cloud from the current top 500 HackerNews top stories.
Embed the plot into a Markdown metadata for quick view.
Expand Down
6 changes: 3 additions & 3 deletions examples/quickstart_gcp/quickstart_gcp/assets/hackernews.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import matplotlib.pyplot as plt
import pandas as pd
import requests
from dagster import MetadataValue, OpExecutionContext, asset
from dagster import AssetExecutionContext, MetadataValue, asset
from wordcloud import STOPWORDS, WordCloud


Expand All @@ -21,7 +21,7 @@ def hackernews_topstory_ids() -> pd.DataFrame:

@asset(group_name="hackernews", compute_kind="HackerNews API")
def hackernews_topstories(
context: OpExecutionContext, hackernews_topstory_ids: pd.DataFrame
context: AssetExecutionContext, hackernews_topstory_ids: pd.DataFrame
) -> pd.DataFrame:
"""Get items based on story ids from the HackerNews items endpoint. It may take 1-2 minutes to fetch all 500 items.
Expand Down Expand Up @@ -54,7 +54,7 @@ def hackernews_topstories(

@asset(group_name="hackernews", compute_kind="Plot")
def hackernews_topstories_word_cloud(
context: OpExecutionContext, hackernews_topstories: pd.DataFrame
context: AssetExecutionContext, hackernews_topstories: pd.DataFrame
) -> None:
"""Exploratory analysis: Generate a word cloud from the current top 500 HackerNews top stories.
Embed the plot into a Markdown metadata for quick view.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import matplotlib.pyplot as plt
import pandas as pd
import requests
from dagster import MetadataValue, OpExecutionContext, asset
from dagster import AssetExecutionContext, MetadataValue, asset
from wordcloud import STOPWORDS, WordCloud


Expand All @@ -21,7 +21,7 @@ def hackernews_topstory_ids() -> pd.DataFrame:

@asset(group_name="hackernews", compute_kind="HackerNews API")
def hackernews_topstories(
context: OpExecutionContext, hackernews_topstory_ids: pd.DataFrame
context: AssetExecutionContext, hackernews_topstory_ids: pd.DataFrame
) -> pd.DataFrame:
"""Get items based on story ids from the HackerNews items endpoint. It may take 1-2 minutes to fetch all 500 items.
Expand Down Expand Up @@ -51,7 +51,7 @@ def hackernews_topstories(

@asset(group_name="hackernews", compute_kind="Plot")
def hackernews_topstories_word_cloud(
context: OpExecutionContext, hackernews_topstories: pd.DataFrame
context: AssetExecutionContext, hackernews_topstories: pd.DataFrame
) -> None:
"""Exploratory analysis: Generate a word cloud from the current top 500 HackerNews top stories.
Embed the plot into a Markdown metadata for quick view.
Expand Down
14 changes: 7 additions & 7 deletions examples/with_wandb/with_wandb/assets/advanced_example.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import wandb
from dagster import AssetIn, OpExecutionContext, asset
from dagster import AssetExecutionContext, AssetIn, asset
from dagster_wandb import WandbArtifactConfiguration

wandb_artifact_configuration: WandbArtifactConfiguration = {
Expand Down Expand Up @@ -79,11 +79,11 @@ def write_advanced_artifact() -> wandb.wandb_sdk.wandb_artifacts.Artifact:
},
output_required=False,
)
def get_table(context: OpExecutionContext, table: wandb.Table) -> None:
def get_table(context: AssetExecutionContext, table: wandb.Table) -> None:
"""Example that reads a W&B Table contained in an Artifact.
Args:
context (OpExecutionContext): Dagster execution context
context (AssetExecutionContext): Dagster execution context
table (wandb.Table): Table contained in our downloaded Artifact
Here, we use the integration to read the W&B Table object created in the previous asset.
Expand All @@ -108,11 +108,11 @@ def get_table(context: OpExecutionContext, table: wandb.Table) -> None:
},
output_required=False,
)
def get_path(context: OpExecutionContext, path: str) -> None:
def get_path(context: AssetExecutionContext, path: str) -> None:
"""Example that gets the local path of a file contained in an Artifact.
Args:
context (OpExecutionContext): Dagster execution context
context (AssetExecutionContext): Dagster execution context
path (str): Path in the local filesystem of the downloaded file
Here, we use the integration to collect the local of the file added through the 'add_dirs' in
Expand All @@ -133,12 +133,12 @@ def get_path(context: OpExecutionContext, path: str) -> None:
output_required=False,
)
def get_artifact(
context: OpExecutionContext, artifact: wandb.wandb_sdk.wandb_artifacts.Artifact
context: AssetExecutionContext, artifact: wandb.wandb_sdk.wandb_artifacts.Artifact
) -> None:
"""Example that gets the entire Artifact object.
Args:
context (OpExecutionContext): Dagster execution context
context (AssetExecutionContext): Dagster execution context
artifact (wandb.wandb_sdk.wandb_artifacts.Artifact): Downloaded Artifact object
Here, we use the integration to collect the entire W&B Artifact object created from in first
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import numpy
import onnxruntime as rt
from dagster import AssetIn, AssetOut, OpExecutionContext, asset, multi_asset
from dagster import AssetExecutionContext, AssetIn, AssetOut, asset, multi_asset
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from sklearn.datasets import load_iris
Expand Down Expand Up @@ -36,7 +36,7 @@ def create_model_serialized_with_joblib():
}
},
)
def use_model_serialized_with_joblib(context: OpExecutionContext, my_joblib_serialized_model):
def use_model_serialized_with_joblib(context: AssetExecutionContext, my_joblib_serialized_model):
inference_result = my_joblib_serialized_model(1, 2)
context.log.info(inference_result) # Prints: 3
return inference_result
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import wandb
from dagster import AssetIn, OpExecutionContext, asset
from dagster import AssetExecutionContext, AssetIn, asset

MODEL_NAME = "my_model"

Expand Down Expand Up @@ -29,12 +29,12 @@ def write_model() -> wandb.wandb_sdk.wandb_artifacts.Artifact:
config_schema={"model_registry": str},
)
def promote_best_model_to_production(
context: OpExecutionContext, artifact: wandb.wandb_sdk.wandb_artifacts.Artifact
context: AssetExecutionContext, artifact: wandb.wandb_sdk.wandb_artifacts.Artifact
):
"""Example that links a model stored in a W&B Artifact to the Model Registry.
Args:
context (OpExecutionContext): Dagster execution context
context (AssetExecutionContext): Dagster execution context
artifact (wandb.wandb_sdk.wandb_artifacts.Artifact): Downloaded Artifact object
"""
# In a real scenario you would evaluate model performance
Expand Down

0 comments on commit 553c06d

Please sign in to comment.