diff --git a/docs/content/concepts/io-management/io-managers.mdx b/docs/content/concepts/io-management/io-managers.mdx index 20a888627f218..bdb016115fedf 100644 --- a/docs/content/concepts/io-management/io-managers.mdx +++ b/docs/content/concepts/io-management/io-managers.mdx @@ -623,8 +623,9 @@ class DataframeTableIOManager(ConfigurableIOManager): def load_input(self, context: InputContext): # upstream_output.name is the name given to the Out that we're loading for - table_name = context.upstream_output.name - return read_dataframe_from_table(name=table_name) + if context.upstream_output: + table_name = context.upstream_output.name + return read_dataframe_from_table(name=table_name) @job(resource_defs={"io_manager": DataframeTableIOManager()}) @@ -725,14 +726,22 @@ In this case, the table names are encoded in the job definition. If, instead, yo ```python file=/concepts/io_management/metadata.py startafter=io_manager_start_marker endbefore=io_manager_end_marker class MyIOManager(ConfigurableIOManager): def handle_output(self, context: OutputContext, obj): - table_name = context.metadata["table"] - schema = context.metadata["schema"] - write_dataframe_to_table(name=table_name, schema=schema, dataframe=obj) + if context.metadata: + table_name = context.metadata["table"] + schema = context.metadata["schema"] + write_dataframe_to_table(name=table_name, schema=schema, dataframe=obj) + else: + raise Exception( + f"op {context.op_def.name} doesn't have schema and metadata set" + ) def load_input(self, context: InputContext): - table_name = context.upstream_output.metadata["table"] - schema = context.upstream_output.metadata["schema"] - return read_dataframe_from_table(name=table_name, schema=schema) + if context.upstream_output and context.upstream_output.metadata: + table_name = context.upstream_output.metadata["table"] + schema = context.upstream_output.metadata["schema"] + return read_dataframe_from_table(name=table_name, schema=schema) + else: + raise Exception("Upstream output doesn't have schema and metadata set") ``` ### Per-input loading in assets @@ -809,9 +818,10 @@ class MyIOManager(IOManager): self.storage_dict[(context.step_key, context.name)] = obj def load_input(self, context: InputContext): - return self.storage_dict[ - (context.upstream_output.step_key, context.upstream_output.name) - ] + if context.upstream_output: + return self.storage_dict[ + (context.upstream_output.step_key, context.upstream_output.name) + ] def test_my_io_manager_handle_output(): @@ -844,8 +854,9 @@ class DataframeTableIOManagerWithMetadata(ConfigurableIOManager): context.add_output_metadata({"num_rows": len(obj), "table_name": table_name}) def load_input(self, context: InputContext): - table_name = context.upstream_output.name - return read_dataframe_from_table(name=table_name) + if context.upstream_output: + table_name = context.upstream_output.name + return read_dataframe_from_table(name=table_name) ``` Any entries yielded this way will be attached to the `Handled Output` event for this output. diff --git a/docs/content/concepts/io-management/unconnected-inputs.mdx b/docs/content/concepts/io-management/unconnected-inputs.mdx index f652a788c9258..7c48c9c7bf526 100644 --- a/docs/content/concepts/io-management/unconnected-inputs.mdx +++ b/docs/content/concepts/io-management/unconnected-inputs.mdx @@ -225,7 +225,8 @@ class MyIOManager(ConfigurableIOManager): write_dataframe_to_table(name=table_name, dataframe=obj) def load_input(self, context: InputContext): - return read_dataframe_from_table(name=context.upstream_output.name) + if context.upstream_output: + return read_dataframe_from_table(name=context.upstream_output.name) @input_manager diff --git a/docs/content/concepts/partitions-schedules-sensors/partitioning-assets.mdx b/docs/content/concepts/partitions-schedules-sensors/partitioning-assets.mdx index 59545d6ef967c..ce667b8814c51 100644 --- a/docs/content/concepts/partitions-schedules-sensors/partitioning-assets.mdx +++ b/docs/content/concepts/partitions-schedules-sensors/partitioning-assets.mdx @@ -108,7 +108,8 @@ from dagster import ( ) ) def multi_partitions_asset(context: AssetExecutionContext): - context.log.info(context.partition_key.keys_by_dimension) + if isinstance(context.partition_key, MultiPartitionKey): + context.log.info(context.partition_key.keys_by_dimension) ``` In this example, the asset would contain a partition for each combination of color and date: @@ -166,8 +167,8 @@ def image_sensor(context: SensorEvaluationContext): new_images = [ img_filename for img_filename in os.listdir(os.getenv("MY_DIRECTORY")) - if not context.instance.has_dynamic_partition( - images_partitions_def.name, img_filename + if not images_partitions_def.has_partition_key( + img_filename, dynamic_partitions_store=context.instance ) ] diff --git a/docs/content/guides/dagster/managing-ml.mdx b/docs/content/guides/dagster/managing-ml.mdx index 0c7d457da9718..7f5967038dedb 100644 --- a/docs/content/guides/dagster/managing-ml.mdx +++ b/docs/content/guides/dagster/managing-ml.mdx @@ -129,15 +129,23 @@ def conditional_machine_learning_model(context: AssetExecutionContext): AssetKey(["conditional_machine_learning_model"]) ) if materialization is None: - yield Output(reg, metadata={"model_accuracy": reg.score(X_test, y_test)}) + yield Output(reg, metadata={"model_accuracy": float(reg.score(X_test, y_test))}) else: - previous_model_accuracy = materialization.asset_materialization.metadata[ - "model_accuracy" - ] + previous_model_accuracy = None + if materialization.asset_materialization and isinstance( + materialization.asset_materialization.metadata["model_accuracy"].value, + float, + ): + previous_model_accuracy = float( + materialization.asset_materialization.metadata["model_accuracy"].value + ) new_model_accuracy = reg.score(X_test, y_test) - if new_model_accuracy > previous_model_accuracy: - yield Output(reg, metadata={"model_accuracy": new_model_accuracy}) + if ( + previous_model_accuracy is None + or new_model_accuracy > previous_model_accuracy + ): + yield Output(reg, metadata={"model_accuracy": float(new_model_accuracy)}) ``` A sensor can be set up that triggers if an asset fails to materialize. Alerts can be customized and sent through e-mail or natively through Slack. In this example, a Slack message is sent anytime the `ml_job` fails. diff --git a/docs/content/integrations/bigquery/reference.mdx b/docs/content/integrations/bigquery/reference.mdx index 5ef8d662ba968..aacaf173024d0 100644 --- a/docs/content/integrations/bigquery/reference.mdx +++ b/docs/content/integrations/bigquery/reference.mdx @@ -231,7 +231,7 @@ from dagster import ( }, ) def iris_data_partitioned(context: AssetExecutionContext) -> pd.DataFrame: - partition = partition = context.partition_key.keys_by_dimension + partition = context.partition_key.keys_by_dimension species = partition["species"] date = partition["date"] diff --git a/docs/content/integrations/duckdb/reference.mdx b/docs/content/integrations/duckdb/reference.mdx index 68c58be8ce67d..683be159106ad 100644 --- a/docs/content/integrations/duckdb/reference.mdx +++ b/docs/content/integrations/duckdb/reference.mdx @@ -232,7 +232,7 @@ from dagster import ( metadata={"partition_expr": {"date": "TO_TIMESTAMP(TIME)", "species": "SPECIES"}}, ) def iris_dataset_partitioned(context: AssetExecutionContext) -> pd.DataFrame: - partition = partition = context.partition_key.keys_by_dimension + partition = context.partition_key.keys_by_dimension species = partition["species"] date = partition["date"] diff --git a/docs/content/integrations/snowflake/reference.mdx b/docs/content/integrations/snowflake/reference.mdx index fa6261caf28ea..76c9a625813ca 100644 --- a/docs/content/integrations/snowflake/reference.mdx +++ b/docs/content/integrations/snowflake/reference.mdx @@ -232,6 +232,7 @@ import pandas as pd from dagster import ( AssetExecutionContext, DailyPartitionsDefinition, + MultiPartitionKey, MultiPartitionsDefinition, StaticPartitionsDefinition, asset, @@ -252,7 +253,7 @@ from dagster import ( }, ) def iris_dataset_partitioned(context: AssetExecutionContext) -> pd.DataFrame: - partition = partition = context.partition_key.keys_by_dimension + partition = context.partition_key.keys_by_dimension species = partition["species"] date = partition["date"] diff --git a/examples/docs_snippets/docs_snippets/concepts/io_management/custom_io_manager.py b/examples/docs_snippets/docs_snippets/concepts/io_management/custom_io_manager.py index e090d625e2dfb..93e3bb7e4e067 100755 --- a/examples/docs_snippets/docs_snippets/concepts/io_management/custom_io_manager.py +++ b/examples/docs_snippets/docs_snippets/concepts/io_management/custom_io_manager.py @@ -110,8 +110,9 @@ def handle_output(self, context: OutputContext, obj): def load_input(self, context: InputContext): # upstream_output.name is the name given to the Out that we're loading for - table_name = context.upstream_output.name - return read_dataframe_from_table(name=table_name) + if context.upstream_output: + table_name = context.upstream_output.name + return read_dataframe_from_table(name=table_name) @job(resource_defs={"io_manager": DataframeTableIOManager()}) @@ -131,8 +132,9 @@ def handle_output(self, context: OutputContext, obj): context.add_output_metadata({"num_rows": len(obj), "table_name": table_name}) def load_input(self, context: InputContext): - table_name = context.upstream_output.name - return read_dataframe_from_table(name=table_name) + if context.upstream_output: + table_name = context.upstream_output.name + return read_dataframe_from_table(name=table_name) # end_metadata_marker diff --git a/examples/docs_snippets/docs_snippets/concepts/io_management/input_managers.py b/examples/docs_snippets/docs_snippets/concepts/io_management/input_managers.py index 86892d5966cdf..d4e5492031ba1 100644 --- a/examples/docs_snippets/docs_snippets/concepts/io_management/input_managers.py +++ b/examples/docs_snippets/docs_snippets/concepts/io_management/input_managers.py @@ -205,7 +205,8 @@ def handle_output(self, context: OutputContext, obj): write_dataframe_to_table(name=table_name, dataframe=obj) def load_input(self, context: InputContext): - return read_dataframe_from_table(name=context.upstream_output.name) + if context.upstream_output: + return read_dataframe_from_table(name=context.upstream_output.name) @input_manager diff --git a/examples/docs_snippets/docs_snippets/concepts/io_management/metadata.py b/examples/docs_snippets/docs_snippets/concepts/io_management/metadata.py index 97078864c1e84..90459dc5e27ea 100755 --- a/examples/docs_snippets/docs_snippets/concepts/io_management/metadata.py +++ b/examples/docs_snippets/docs_snippets/concepts/io_management/metadata.py @@ -39,14 +39,22 @@ def op_2(_input_dataframe): # io_manager_start_marker class MyIOManager(ConfigurableIOManager): def handle_output(self, context: OutputContext, obj): - table_name = context.metadata["table"] - schema = context.metadata["schema"] - write_dataframe_to_table(name=table_name, schema=schema, dataframe=obj) + if context.metadata: + table_name = context.metadata["table"] + schema = context.metadata["schema"] + write_dataframe_to_table(name=table_name, schema=schema, dataframe=obj) + else: + raise Exception( + f"op {context.op_def.name} doesn't have schema and metadata set" + ) def load_input(self, context: InputContext): - table_name = context.upstream_output.metadata["table"] - schema = context.upstream_output.metadata["schema"] - return read_dataframe_from_table(name=table_name, schema=schema) + if context.upstream_output and context.upstream_output.metadata: + table_name = context.upstream_output.metadata["table"] + schema = context.upstream_output.metadata["schema"] + return read_dataframe_from_table(name=table_name, schema=schema) + else: + raise Exception("Upstream output doesn't have schema and metadata set") # io_manager_end_marker diff --git a/examples/docs_snippets/docs_snippets/concepts/io_management/output_config.py b/examples/docs_snippets/docs_snippets/concepts/io_management/output_config.py index ba63f63cdf2dc..aaeae90b4aeff 100755 --- a/examples/docs_snippets/docs_snippets/concepts/io_management/output_config.py +++ b/examples/docs_snippets/docs_snippets/concepts/io_management/output_config.py @@ -30,8 +30,9 @@ def handle_output(self, context: OutputContext, obj): write_dataframe_to_table(name=table_name, dataframe=obj) def load_input(self, context: InputContext): - table_name = context.upstream_output.config["table"] - return read_dataframe_from_table(name=table_name) + if context.upstream_output: + table_name = context.upstream_output.config["table"] + return read_dataframe_from_table(name=table_name) @io_manager(output_config_schema={"table": str}) diff --git a/examples/docs_snippets/docs_snippets/concepts/io_management/test_io_manager.py b/examples/docs_snippets/docs_snippets/concepts/io_management/test_io_manager.py index 633b5f32fcdb6..47562073247e0 100644 --- a/examples/docs_snippets/docs_snippets/concepts/io_management/test_io_manager.py +++ b/examples/docs_snippets/docs_snippets/concepts/io_management/test_io_manager.py @@ -15,9 +15,10 @@ def handle_output(self, context: OutputContext, obj): self.storage_dict[(context.step_key, context.name)] = obj def load_input(self, context: InputContext): - return self.storage_dict[ - (context.upstream_output.step_key, context.upstream_output.name) - ] + if context.upstream_output: + return self.storage_dict[ + (context.upstream_output.step_key, context.upstream_output.name) + ] def test_my_io_manager_handle_output(): diff --git a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/dynamic_partitioned_asset.py b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/dynamic_partitioned_asset.py index ecb378c18226a..672d27486678b 100644 --- a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/dynamic_partitioned_asset.py +++ b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/dynamic_partitioned_asset.py @@ -37,8 +37,8 @@ def image_sensor(context: SensorEvaluationContext): new_images = [ img_filename for img_filename in os.listdir(os.getenv("MY_DIRECTORY")) - if not context.instance.has_dynamic_partition( - images_partitions_def.name, img_filename + if not images_partitions_def.has_partition_key( + img_filename, dynamic_partitions_store=context.instance ) ] diff --git a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/multipartitions_asset.py b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/multipartitions_asset.py index 55841b8fca836..44d47e749d10c 100644 --- a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/multipartitions_asset.py +++ b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/multipartitions_asset.py @@ -17,7 +17,8 @@ ) ) def multi_partitions_asset(context: AssetExecutionContext): - context.log.info(context.partition_key.keys_by_dimension) + if isinstance(context.partition_key, MultiPartitionKey): + context.log.info(context.partition_key.keys_by_dimension) # end_multi_partitions_marker diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/managing_ml/managing_ml_code.py b/examples/docs_snippets/docs_snippets/guides/dagster/managing_ml/managing_ml_code.py index 2f78649ef05f0..0d1f0a0123bf5 100644 --- a/examples/docs_snippets/docs_snippets/guides/dagster/managing_ml/managing_ml_code.py +++ b/examples/docs_snippets/docs_snippets/guides/dagster/managing_ml/managing_ml_code.py @@ -97,15 +97,23 @@ def conditional_machine_learning_model(context: AssetExecutionContext): AssetKey(["conditional_machine_learning_model"]) ) if materialization is None: - yield Output(reg, metadata={"model_accuracy": reg.score(X_test, y_test)}) + yield Output(reg, metadata={"model_accuracy": float(reg.score(X_test, y_test))}) else: - previous_model_accuracy = materialization.asset_materialization.metadata[ - "model_accuracy" - ] + previous_model_accuracy = None + if materialization.asset_materialization and isinstance( + materialization.asset_materialization.metadata["model_accuracy"].value, + float, + ): + previous_model_accuracy = float( + materialization.asset_materialization.metadata["model_accuracy"].value + ) new_model_accuracy = reg.score(X_test, y_test) - if new_model_accuracy > previous_model_accuracy: - yield Output(reg, metadata={"model_accuracy": new_model_accuracy}) + if ( + previous_model_accuracy is None + or new_model_accuracy > previous_model_accuracy + ): + yield Output(reg, metadata={"model_accuracy": float(new_model_accuracy)}) ## conditional_monitoring_end diff --git a/examples/docs_snippets/docs_snippets/integrations/bigquery/reference/multi_partition.py b/examples/docs_snippets/docs_snippets/integrations/bigquery/reference/multi_partition.py index 3a385245baa5f..b2c185e081381 100644 --- a/examples/docs_snippets/docs_snippets/integrations/bigquery/reference/multi_partition.py +++ b/examples/docs_snippets/docs_snippets/integrations/bigquery/reference/multi_partition.py @@ -29,7 +29,7 @@ def get_iris_data_for_date(*args, **kwargs): }, ) def iris_data_partitioned(context: AssetExecutionContext) -> pd.DataFrame: - partition = partition = context.partition_key.keys_by_dimension + partition = context.partition_key.keys_by_dimension # type: ignore species = partition["species"] date = partition["date"] diff --git a/examples/docs_snippets/docs_snippets/integrations/duckdb/reference/multi_partition.py b/examples/docs_snippets/docs_snippets/integrations/duckdb/reference/multi_partition.py index 9f1f5e2480129..f87abc1e17bf8 100644 --- a/examples/docs_snippets/docs_snippets/integrations/duckdb/reference/multi_partition.py +++ b/examples/docs_snippets/docs_snippets/integrations/duckdb/reference/multi_partition.py @@ -27,7 +27,7 @@ def get_iris_data_for_date(*args, **kwargs): metadata={"partition_expr": {"date": "TO_TIMESTAMP(TIME)", "species": "SPECIES"}}, ) def iris_dataset_partitioned(context: AssetExecutionContext) -> pd.DataFrame: - partition = partition = context.partition_key.keys_by_dimension + partition = context.partition_key.keys_by_dimension # type: ignore species = partition["species"] date = partition["date"] diff --git a/examples/docs_snippets/docs_snippets/integrations/snowflake/multi_partition.py b/examples/docs_snippets/docs_snippets/integrations/snowflake/multi_partition.py index f683e0f9d6288..1080c68e7feca 100644 --- a/examples/docs_snippets/docs_snippets/integrations/snowflake/multi_partition.py +++ b/examples/docs_snippets/docs_snippets/integrations/snowflake/multi_partition.py @@ -1,5 +1,8 @@ +import pandas as pd + + def get_iris_data_for_date(*args, **kwargs): - pass + return pd.DataFrame() # start_example @@ -9,6 +12,7 @@ def get_iris_data_for_date(*args, **kwargs): from dagster import ( AssetExecutionContext, DailyPartitionsDefinition, + MultiPartitionKey, MultiPartitionsDefinition, StaticPartitionsDefinition, asset, @@ -29,7 +33,7 @@ def get_iris_data_for_date(*args, **kwargs): }, ) def iris_dataset_partitioned(context: AssetExecutionContext) -> pd.DataFrame: - partition = partition = context.partition_key.keys_by_dimension + partition = context.partition_key.keys_by_dimension # type: ignore species = partition["species"] date = partition["date"]