Skip to content

Commit

Permalink
pr comment
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 3, 2023
1 parent 4a6f1c9 commit 4469d08
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,6 @@ def partition_keys(self) -> Sequence[str]:
you can use ``partition_keys`` to get all of the partitions being materialized
by the backfill.
Raises an error if the asset is a multi-asset. In a multi-asset, use ``asset_partition_keys_for_output``
Examples:
.. code-block:: python
Expand All @@ -331,25 +329,12 @@ def an_asset(context: AssetExecutionContext):
# running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
# ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
@asset(
partitions_def=partitions_def,
ins={
"self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
}
)
def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
context.log.info(context.partition_keys)
# running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
# ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
"""
key_range = self.partition_key_range
partitions_def = self.assets_def.partitions_def
if partitions_def is None:
raise DagsterInvariantViolationError(
"Cannot access partition_key for a non-partitioned run"
"Cannot access partition_keys for a non-partitioned run"
)

return partitions_def.get_partition_keys_in_range(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ def upstream_asset(context) -> None:
partitions_def.time_window_for_partition_key(key_range.start).start,
partitions_def.time_window_for_partition_key(key_range.end).end,
)
assert context.partition_keys == partitions_def.get_partition_keys_in_range(key_range)

@asset(partitions_def=partitions_def, deps=["upstream_asset"])
def downstream_asset(context) -> None:
Expand Down

0 comments on commit 4469d08

Please sign in to comment.