Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make input partitions methods based on asset key not input name #19027

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ea85cfb
add DI asset context
jamiedemaria Dec 6, 2023
45bca56
add parent class so typing can start to work
jamiedemaria Dec 6, 2023
8a5b90e
fix circular import
jamiedemaria Dec 7, 2023
007fc0e
fix circular import
jamiedemaria Dec 7, 2023
26a4ea1
fixt two tests
jamiedemaria Dec 8, 2023
daed438
add is_bound
jamiedemaria Dec 8, 2023
6d710dc
add run prop test
jamiedemaria Dec 8, 2023
260fa03
pipes
jamiedemaria Dec 8, 2023
2f31b87
update for dowstream changes
jamiedemaria Dec 28, 2023
bf0f24d
update naming
jamiedemaria Dec 28, 2023
7c4481e
remove unneeded test
jamiedemaria Jan 2, 2024
7913807
pyright fix
jamiedemaria Jan 2, 2024
ac881a4
small cleanup
jamiedemaria Jan 26, 2024
b00ca5d
add time window to DI contexts
jamiedemaria Jan 26, 2024
45b652e
update for rename
jamiedemaria Jan 29, 2024
6732f46
remove thing that needs to be in other branch
jamiedemaria Jan 30, 2024
63e5bc6
comment
jamiedemaria Jan 30, 2024
303086e
small
jamiedemaria Jan 2, 2024
dd1e5db
asset materialization event
jamiedemaria Jan 2, 2024
7da9f4b
better naming
jamiedemaria Jan 2, 2024
32ab2f6
via op context
jamiedemaria Jan 4, 2024
6e74a25
rename and add exception
jamiedemaria Jan 16, 2024
fc5c91a
DI context
jamiedemaria Jan 16, 2024
686afd7
fix naming
jamiedemaria Jan 17, 2024
54cf22d
move storing AM to when it is accurate
jamiedemaria Jan 19, 2024
cdf607e
make input partition methods based on asset key
jamiedemaria Jan 4, 2024
4ba8938
fix straggler
jamiedemaria Jan 4, 2024
60e6044
clean up
jamiedemaria Jan 17, 2024
052f08e
straggling rename
jamiedemaria Jan 18, 2024
a57ce96
add back the input methods so we dont make a breaking change
jamiedemaria Jan 19, 2024
d2ab183
Revert "add back the input methods so we dont make a breaking change"
jamiedemaria Jan 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions python_modules/dagster/dagster/_core/definitions/op_invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
from .result import MaterializeResult

if TYPE_CHECKING:
from ..execution.context.invocation import DirectOpExecutionContext
from ..execution.context.compute import OpExecutionContext
from ..execution.context.invocation import BaseDirectExecutionContext
from .assets import AssetsDefinition
from .composition import PendingNodeInvocation
from .decorators.op_decorator import DecoratedOpFunction
Expand Down Expand Up @@ -100,6 +101,16 @@ def _separate_args_and_kwargs(
)


def _get_op_context(
context,
) -> "OpExecutionContext":
from dagster._core.execution.context.compute import AssetExecutionContext

if isinstance(context, AssetExecutionContext):
return context.op_execution_context
return context


def direct_invocation_result(
def_or_invocation: Union[
"OpDefinition", "PendingNodeInvocation[OpDefinition]", "AssetsDefinition"
Expand All @@ -109,7 +120,7 @@ def direct_invocation_result(
) -> Any:
from dagster._config.pythonic_config import Config
from dagster._core.execution.context.invocation import (
DirectOpExecutionContext,
BaseDirectExecutionContext,
build_op_context,
)

Expand Down Expand Up @@ -149,12 +160,12 @@ def direct_invocation_result(
" no context was provided when invoking."
)
if len(args) > 0:
if args[0] is not None and not isinstance(args[0], DirectOpExecutionContext):
if args[0] is not None and not isinstance(args[0], BaseDirectExecutionContext):
raise DagsterInvalidInvocationError(
f"Decorated function '{compute_fn.name}' has context argument, "
"but no context was provided when invoking."
)
context = cast(DirectOpExecutionContext, args[0])
context = args[0]
# update args to omit context
args = args[1:]
else: # context argument is provided under kwargs
Expand All @@ -165,14 +176,14 @@ def direct_invocation_result(
f"'{context_param_name}', but no value for '{context_param_name}' was "
f"found when invoking. Provided kwargs: {kwargs}"
)
context = cast(DirectOpExecutionContext, kwargs[context_param_name])
context = kwargs[context_param_name]
# update kwargs to remove context
kwargs = {
kwarg: val for kwarg, val in kwargs.items() if not kwarg == context_param_name
}
# allow passing context, even if the function doesn't have an arg for it
elif len(args) > 0 and isinstance(args[0], DirectOpExecutionContext):
context = cast(DirectOpExecutionContext, args[0])
elif len(args) > 0 and isinstance(args[0], BaseDirectExecutionContext):
context = args[0]
args = args[1:]

resource_arg_mapping = {arg.name: arg.name for arg in compute_fn.get_resource_args()}
Expand Down Expand Up @@ -230,7 +241,7 @@ def direct_invocation_result(


def _resolve_inputs(
op_def: "OpDefinition", args, kwargs, context: "DirectOpExecutionContext"
op_def: "OpDefinition", args, kwargs, context: "BaseDirectExecutionContext"
) -> Mapping[str, Any]:
from dagster._core.execution.plan.execute_step import do_type_check

Expand Down Expand Up @@ -333,7 +344,7 @@ def _resolve_inputs(
return input_dict


def _key_for_result(result: MaterializeResult, context: "DirectOpExecutionContext") -> AssetKey:
def _key_for_result(result: MaterializeResult, context: "BaseDirectExecutionContext") -> AssetKey:
if not context.per_invocation_properties.assets_def:
raise DagsterInvariantViolationError(
f"Op {context.per_invocation_properties.alias} does not have an assets definition."
Expand All @@ -355,7 +366,7 @@ def _key_for_result(result: MaterializeResult, context: "DirectOpExecutionContex

def _output_name_for_result_obj(
event: MaterializeResult,
context: "DirectOpExecutionContext",
context: "BaseDirectExecutionContext",
):
if not context.per_invocation_properties.assets_def:
raise DagsterInvariantViolationError(
Expand All @@ -368,7 +379,7 @@ def _output_name_for_result_obj(
def _handle_gen_event(
event: T,
op_def: "OpDefinition",
context: "DirectOpExecutionContext",
context: "BaseDirectExecutionContext",
output_defs: Mapping[str, OutputDefinition],
outputs_seen: Set[str],
) -> T:
Expand Down Expand Up @@ -402,7 +413,7 @@ def _handle_gen_event(


def _type_check_output_wrapper(
op_def: "OpDefinition", result: Any, context: "DirectOpExecutionContext"
op_def: "OpDefinition", result: Any, context: "BaseDirectExecutionContext"
) -> Any:
"""Type checks and returns the result of a op.

Expand Down Expand Up @@ -496,12 +507,13 @@ def type_check_gen(gen):


def _type_check_function_output(
op_def: "OpDefinition", result: T, context: "DirectOpExecutionContext"
op_def: "OpDefinition", result: T, context: "BaseDirectExecutionContext"
) -> T:
from ..execution.plan.compute_generator import validate_and_coerce_op_result_to_iterator

output_defs_by_name = {output_def.name: output_def for output_def in op_def.output_defs}
for event in validate_and_coerce_op_result_to_iterator(result, context, op_def.output_defs):
op_context = _get_op_context(context)
for event in validate_and_coerce_op_result_to_iterator(result, op_context, op_def.output_defs):
if isinstance(event, (Output, DynamicOutput)):
_type_check_output(output_defs_by_name[event.output_name], event, context)
elif isinstance(event, (MaterializeResult)):
Expand All @@ -515,14 +527,14 @@ def _type_check_function_output(
def _type_check_output(
output_def: "OutputDefinition",
output: Union[Output, DynamicOutput],
context: "DirectOpExecutionContext",
context: "BaseDirectExecutionContext",
) -> None:
"""Validates and performs core type check on a provided output.

Args:
output_def (OutputDefinition): The output definition to validate against.
output (Any): The output to validate.
context (DirectOpExecutionContext): Context containing resources to be used for type
context (BaseDirectExecutionContext): Context containing resources to be used for type
check.
"""
from ..execution.plan.execute_step import do_type_check
Expand Down
44 changes: 39 additions & 5 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
AssetKey,
AssetMaterialization,
AssetObservation,
CoercibleToAssetKey,
ExpectationResult,
UserEvent,
)
Expand Down Expand Up @@ -948,7 +949,10 @@ def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):


"""
return self._step_execution_context.asset_partition_key_range_for_input(input_name)
upstream_asset_key = self.asset_key_for_input(input_name)
return self._step_execution_context.asset_partition_key_range_for_upstream_asset(
upstream_asset_key
)

@public
def asset_partition_key_for_input(self, input_name: str) -> str:
Expand Down Expand Up @@ -991,7 +995,10 @@ def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
# "2023-08-20"

"""
return self._step_execution_context.asset_partition_key_for_input(input_name)
upstream_asset_key = self.asset_key_for_input(input_name)
return self._step_execution_context.asset_partition_key_for_upstream_asset(
upstream_asset_key
)

@public
def asset_partitions_def_for_output(self, output_name: str = "result") -> PartitionsDefinition:
Expand Down Expand Up @@ -1207,9 +1214,10 @@ def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
# running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
# ["2023-08-20", "2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24"]
"""
upstream_asset_key = self.asset_key_for_input(input_name)
return list(
self._step_execution_context.asset_partitions_subset_for_input(
input_name
self._step_execution_context.asset_partitions_subset_for_upstream_asset(
upstream_asset_key
).get_partition_keys()
)

Expand Down Expand Up @@ -1287,7 +1295,10 @@ def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
# TimeWindow("2023-08-20", "2023-08-25")

"""
return self._step_execution_context.asset_partitions_time_window_for_input(input_name)
upstream_asset_key = self.asset_key_for_input(input_name)
return self._step_execution_context.asset_partitions_time_window_for_upstream_asset(
upstream_asset_key
)

@public
@experimental
Expand Down Expand Up @@ -1459,6 +1470,29 @@ def job_def(self) -> JobDefinition:
"""
return self.op_execution_context.job_def

@public
def latest_materialization_for_upstream_asset(
self, key: CoercibleToAssetKey
) -> Optional[AssetMaterialization]:
"""Get the most recent AssetMaterialization event for the key. The key must be an upstream
asset for the currently materializing asset. Information like metadata and tags can be found
on the AssetMaterialization. If the key is not an upstream asset of the currently
materializing asset, an error will be raised. If no AssetMaterialization exists for key, None
will be returned.

Returns: Optional[AssetMaterialization]
"""
materialization_events = (
self.op_execution_context._step_execution_context.upstream_asset_materialization_events # noqa: SLF001
)
if AssetKey.from_coercible(key) in materialization_events.keys():
return materialization_events.get(AssetKey.from_coercible(key))

raise DagsterInvariantViolationError(
f"Cannot fetch AssetMaterialization for asset {key}. {key} must be an upstream dependency"
"in order to call latest_materialization_for_upstream_asset."
)

######## Deprecated methods

@deprecated(**_get_deprecation_kwargs("dagster_run"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def asset_partitions_time_window(self) -> TimeWindow:
"Tried to access asset_partitions_time_window, but the asset is not partitioned.",
)

return self.step_context.asset_partitions_time_window_for_input(self.name)
return self.step_context.asset_partitions_time_window_for_upstream_asset(self.asset_key)

@public
def get_identifier(self) -> Sequence[str]:
Expand Down
Loading