Skip to content

Commit

Permalink
update context type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 8, 2023
1 parent a155dc0 commit a62bf15
Show file tree
Hide file tree
Showing 113 changed files with 401 additions and 303 deletions.
11 changes: 9 additions & 2 deletions docs/content/concepts/assets/asset-checks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,18 @@ When defining an asset using the <PyObject object="asset" displayText="@asset" /
```python file=/concepts/assets/asset_checks/asset_with_check.py
import pandas as pd

from dagster import AssetCheckResult, AssetCheckSpec, Definitions, Output, asset
from dagster import (
AssetCheckResult,
AssetCheckSpec,
AssetExecutionContext,
Definitions,
Output,
asset,
)


@asset(check_specs=[AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders")])
def orders(context):
def orders(context: AssetExecutionContext):
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})

# save the output and indicate that it's been saved
Expand Down
8 changes: 4 additions & 4 deletions docs/content/concepts/assets/asset-observations.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ from dagster import AssetObservation, op


@op
def observation_op(context):
def observation_op(context: OpExecutionContext):
df = read_df()
context.log_event(
AssetObservation(asset_key="observation_asset", metadata={"num_rows": len(df)})
Expand All @@ -59,7 +59,7 @@ from dagster import AssetMaterialization, AssetObservation, MetadataValue, op


@op
def observes_dataset_op(context):
def observes_dataset_op(context: OpExecutionContext):
df = read_df()
remote_storage_path = persist_to_storage(df)
context.log_event(
Expand Down Expand Up @@ -93,15 +93,15 @@ height={1146}
If you are observing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the `partition` argument on the object.

```python file=/concepts/assets/observations.py startafter=start_partitioned_asset_observation endbefore=end_partitioned_asset_observation
from dagster import AssetMaterialization, Config, op
from dagster import AssetMaterialization, Config, op, OpExecutionContext


class MyOpConfig(Config):
date: str


@op
def partitioned_dataset_op(context, config: MyOpConfig):
def partitioned_dataset_op(context: OpExecutionContext, config: MyOpConfig):
partition_date = config.date
df = read_df_for_date(partition_date)
context.log_event(
Expand Down
4 changes: 2 additions & 2 deletions docs/content/concepts/assets/multi-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ The function responsible for computing the contents of any software-defined asse
The easiest way to create a multi-asset is with the <PyObject object="multi_asset" decorator /> decorator. This decorator functions similarly to the <PyObject object="asset" decorator /> decorator, but requires an `outs` parameter specifying each output asset of the function.

```python file=/concepts/assets/multi_assets.py startafter=start_basic_multi_asset endbefore=end_basic_multi_asset
from dagster import AssetOut, multi_asset
from dagster import AssetOut, multi_asset, AssetExecutionContext


@multi_asset(
Expand Down Expand Up @@ -100,7 +100,7 @@ from dagster import AssetOut, Output, multi_asset
},
can_subset=True,
)
def split_actions(context):
def split_actions(context: AssetExecutionContext):
if "a" in context.selected_output_names:
yield Output(value=123, output_name="a")
if "b" in context.selected_output_names:
Expand Down
2 changes: 1 addition & 1 deletion docs/content/concepts/assets/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ Consider the following asset that uses a context object:

```python file=/concepts/assets/asset_testing.py startafter=start_with_context_asset endbefore=end_with_context_asset
@asset
def uses_context(context):
def uses_context(context: AssetExecutionContext):
context.log.info(context.run_id)
return "bar"
```
Expand Down
27 changes: 17 additions & 10 deletions docs/content/concepts/configuration/config-schema-legacy.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ Below we define a simple op and asset with identical `config_schemas` defining a

```python file=/concepts/configuration/configurable_op_asset_resource.py startafter=start endbefore=end
@op(config_schema={"person_name": str})
def op_using_config(context):
def op_using_config(context: OpExecutionContext):
return f'hello {context.op_config["person_name"]}'


@asset(config_schema={"person_name": str})
def asset_using_config(context):
def asset_using_config(context: AssetExecutionContext):
# Note how asset config is also accessed with context.op_config
return f'hello {context.op_config["person_name"]}'


@resource(config_schema={"url": str})
def resource_using_config(context):
def resource_using_config(context: InitResourceContext):
return MyDatabaseConnection(context.resource_config["url"])
```

Expand Down Expand Up @@ -143,18 +143,25 @@ asset_result = materialize(
A common use case for configuration is passing secrets to connect to external services. Resources, which can be used to model connections to external services, accept secrets as configuration values. These secrets can be read from your environment variables:

```python file=/concepts/configuration/env_vars_config.py startafter=start_database_example endbefore=end_database_example
from dagster import StringSource, job, op, resource
from dagster import (
InitResourceContext,
OpExecutionContext,
StringSource,
job,
op,
resource,
)


@resource(config_schema={"username": StringSource, "password": StringSource})
def database_client(context):
def database_client(context: InitResourceContext):
username = context.resource_config["username"]
password = context.resource_config["password"]
...


@op(required_resource_keys={"database"})
def get_one(context):
def get_one(context: OpExecutionContext):
context.resources.database.execute_query("SELECT 1")


Expand Down Expand Up @@ -190,15 +197,15 @@ It defaults to <PyObject module="dagster" object="Any" /> type, meaning Dagster

```python file=/concepts/configuration/make_values_resource_any.py startafter=start_file_example endbefore=end_file_example
@op(required_resource_keys={"file_dir"})
def add_file(context):
def add_file(context: OpExecutionContext):
filename = f"{context.resources.file_dir}/new_file.txt"
open(filename, "x", encoding="utf8").close()

context.log.info(f"Created file: {filename}")


@op(required_resource_keys={"file_dir"})
def total_num_files(context):
def total_num_files(context: OpExecutionContext):
files_in_dir = os.listdir(context.resources.file_dir)
context.log.info(f"Total number of files: {len(files_in_dir)}")

Expand All @@ -221,15 +228,15 @@ Alternatively, if you want to provide different config values for each op within

```python file=/concepts/configuration/make_values_resource_config_schema.py startafter=start_file_example endbefore=end_file_example
@op(required_resource_keys={"file_dirs"})
def write_file(context):
def write_file(context: OpExecutionContext):
filename = f"{context.resources.file_dirs['write_file_dir']}/new_file.txt"
open(filename, "x", encoding="utf8").close()

context.log.info(f"Created file: {filename}")


@op(required_resource_keys={"file_dirs"})
def total_num_files(context):
def total_num_files(context: OpExecutionContext):
files_in_dir = os.listdir(context.resources.file_dirs["count_file_dir"])
context.log.info(f"Total number of files: {len(files_in_dir)}")

Expand Down
6 changes: 3 additions & 3 deletions docs/content/concepts/configuration/configured.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def unsigned_s3_session(config):
You can use the `configured` API with any definition type in the same way. For example, to configure an op, you can simply invoke `configured` on the op definition:

```python file=/concepts/configuration/configured_op_example.py
from dagster import Field, configured, op
from dagster import Field, OpExecutionContext, configured, op


@op(
Expand All @@ -123,7 +123,7 @@ from dagster import Field, configured, op
"word": Field(str, is_required=False, default_value="hello"),
}
)
def example(context):
def example(context: OpExecutionContext):
for _ in range(context.op_config["iterations"]):
context.log.info(context.op_config["word"])

Expand Down Expand Up @@ -206,7 +206,7 @@ When using the decorator syntax (`@configured`), the resulting op definition wil
},
ins={"xs": In(List[Int])},
)
def get_dataset(context, xs):
def get_dataset(context: OpExecutionContext, xs):
if context.op_config["is_sample"]:
return xs[:5]
else:
Expand Down
42 changes: 22 additions & 20 deletions docs/content/concepts/io-management/io-managers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ If your I/O manager is more complex, or needs to manage internal state, it may m
In this case, we implement a stateful I/O manager which maintains a cache.

```python file=/concepts/io_management/custom_io_manager.py startafter=start_io_manager_factory_marker endbefore=end_io_manager_factory_marker
from dagster import IOManager, ConfigurableIOManagerFactory
from dagster import IOManager, ConfigurableIOManagerFactory, OutputContext, InputContext
import requests


Expand All @@ -401,10 +401,10 @@ class ExternalIOManager(IOManager):
# setup stateful cache
self._cache = {}

def handle_output(self, context, obj):
def handle_output(self, context: OutputContext, obj):
...

def load_input(self, context):
def load_input(self, context: InputContext):
if context.asset_key in self._cache:
return self._cache[context.asset_key]
...
Expand Down Expand Up @@ -470,10 +470,10 @@ class MyPartitionedIOManager(IOManager):
else:
return "/".join(context.asset_key.path)

def handle_output(self, context, obj):
def handle_output(self, context: OutputContext, obj):
write_csv(self._get_path(context), obj)

def load_input(self, context):
def load_input(self, context: InputContext):
return read_csv(self._get_path(context))
```

Expand Down Expand Up @@ -542,7 +542,7 @@ Since the method for loading an input is directly affected by the way the corres
```python file=/concepts/io_management/input_managers.py startafter=start_plain_input_manager endbefore=end_plain_input_manager
# in this case PandasIOManager is an existing IO Manager
class MyNumpyLoader(PandasIOManager):
def load_input(self, context) -> np.ndarray:
def load_input(self, context: InputContext) -> np.ndarray:
file_path = "path/to/dataframe"
array = np.genfromtxt(file_path, delimiter=",", dtype=None)
return array
Expand Down Expand Up @@ -571,19 +571,19 @@ class BetterPandasIOManager(ConfigurableIOManager):
f"{output_context.step_key}_{output_context.name}.csv",
)

def handle_output(self, context, obj: pd.DataFrame):
def handle_output(self, context: OutputContext, obj: pd.DataFrame):
file_path = self._get_path(context)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
if obj is not None:
obj.to_csv(file_path, index=False)

def load_input(self, context) -> pd.DataFrame:
def load_input(self, context: InputContext) -> pd.DataFrame:
return pd.read_csv(self._get_path(context.upstream_output))


# write a subclass that uses _get_path for your custom loading logic
class MyBetterNumpyLoader(BetterPandasIOManager):
def load_input(self, context) -> np.ndarray:
def load_input(self, context: InputContext) -> np.ndarray:
file_path = self._get_path(context.upstream_output)
array = np.genfromtxt(file_path, delimiter=",", dtype=None)
return array
Expand Down Expand Up @@ -616,12 +616,12 @@ from dagster import ConfigurableIOManager, io_manager


class DataframeTableIOManager(ConfigurableIOManager):
def handle_output(self, context, obj):
def handle_output(self, context: OutputContext, obj):
# name is the name given to the Out that we're storing for
table_name = context.name
write_dataframe_to_table(name=table_name, dataframe=obj)

def load_input(self, context):
def load_input(self, context: InputContext):
# upstream_output.name is the name given to the Out that we're loading for
table_name = context.upstream_output.name
return read_dataframe_from_table(name=table_name)
Expand Down Expand Up @@ -724,12 +724,12 @@ In this case, the table names are encoded in the job definition. If, instead, yo

```python file=/concepts/io_management/metadata.py startafter=io_manager_start_marker endbefore=io_manager_end_marker
class MyIOManager(ConfigurableIOManager):
def handle_output(self, context, obj):
def handle_output(self, context: OutputContext, obj):
table_name = context.metadata["table"]
schema = context.metadata["schema"]
write_dataframe_to_table(name=table_name, schema=schema, dataframe=obj)

def load_input(self, context):
def load_input(self, context: InputContext):
table_name = context.upstream_output.metadata["table"]
schema = context.upstream_output.metadata["schema"]
return read_dataframe_from_table(name=table_name, schema=schema)
Expand All @@ -743,7 +743,7 @@ In this example, we store `upstream_asset` as a Pandas DataFrame, and we write a

```python file=/concepts/assets/asset_input_managers_numpy.py startafter=start_numpy_example endbefore=end_numpy_example
class PandasAssetIOManager(ConfigurableIOManager):
def handle_output(self, context, obj):
def handle_output(self, context: OutputContext, obj):
file_path = self._get_path(context)
store_pandas_dataframe(name=file_path, table=obj)

Expand All @@ -753,13 +753,13 @@ class PandasAssetIOManager(ConfigurableIOManager):
f"{context.asset_key.path[-1]}.csv",
)

def load_input(self, context) -> pd.DataFrame:
def load_input(self, context: InputContext) -> pd.DataFrame:
file_path = self._get_path(context)
return load_pandas_dataframe(name=file_path)


class NumpyAssetIOManager(PandasAssetIOManager):
def load_input(self, context) -> np.ndarray:
def load_input(self, context: InputContext) -> np.ndarray:
file_path = self._get_path(context)
return load_numpy_array(name=file_path)

Expand Down Expand Up @@ -793,7 +793,9 @@ Here's an example for a simple I/O manager that stores outputs in an in-memory d

```python file=/concepts/io_management/test_io_manager.py
from dagster import (
InputContext,
IOManager,
OutputContext,
build_input_context,
build_output_context,
)
Expand All @@ -803,10 +805,10 @@ class MyIOManager(IOManager):
def __init__(self):
self.storage_dict = {}

def handle_output(self, context, obj):
def handle_output(self, context: OutputContext, obj):
self.storage_dict[(context.step_key, context.name)] = obj

def load_input(self, context):
def load_input(self, context: InputContext):
return self.storage_dict[
(context.upstream_output.step_key, context.upstream_output.name)
]
Expand Down Expand Up @@ -835,13 +837,13 @@ Sometimes, you may want to record some metadata while handling an output in an I

```python file=/concepts/io_management/custom_io_manager.py startafter=start_metadata_marker endbefore=end_metadata_marker
class DataframeTableIOManagerWithMetadata(ConfigurableIOManager):
def handle_output(self, context, obj):
def handle_output(self, context: OutputContext, obj):
table_name = context.name
write_dataframe_to_table(name=table_name, dataframe=obj)

context.add_output_metadata({"num_rows": len(obj), "table_name": table_name})

def load_input(self, context):
def load_input(self, context: InputContext):
table_name = context.upstream_output.name
return read_dataframe_from_table(name=table_name)
```
Expand Down
Loading

0 comments on commit a62bf15

Please sign in to comment.