diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index edaccaa2f041c..fb1aa01ac2d0f 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -308,6 +308,40 @@ def my_asset(context: AssetExecutionContext): """ return self._step_execution_context.partition_key + @public + @property + def partition_keys(self) -> Sequence[str]: + """Returns a list of the partition keys for the current run. + + If you want to write your asset to support running a backfill of several partitions in a single run, + you can use ``partition_keys`` to get all of the partitions being materialized + by the backfill. + + Examples: + .. code-block:: python + + partitions_def = DailyPartitionsDefinition("2023-08-20") + + @asset(partitions_def=partitions_def) + def an_asset(context: AssetExecutionContext): + context.log.info(context.partition_keys) + + + # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log: + # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"] + """ + key_range = self.partition_key_range + partitions_def = self.assets_def.partitions_def + if partitions_def is None: + raise DagsterInvariantViolationError( + "Cannot access partition_keys for a non-partitioned run" + ) + + return partitions_def.get_partition_keys_in_range( + key_range, + dynamic_partitions_store=self.instance, + ) + @deprecated(breaking_version="2.0", additional_warn_text="Use `partition_key_range` instead.") @public @property diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index 0be1074d919a5..d0dac272893ee 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -604,6 +604,7 @@ def upstream_asset(context) -> None: partitions_def.time_window_for_partition_key(key_range.start).start, partitions_def.time_window_for_partition_key(key_range.end).end, ) + assert context.partition_keys == partitions_def.get_partition_keys_in_range(key_range) @asset(partitions_def=partitions_def, deps=["upstream_asset"]) def downstream_asset(context) -> None: