Skip to content

Commit

Permalink
update typing and naming
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 28, 2023
1 parent d9ea3e0 commit 5e9f627
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def op_config(self) -> Any:
"""The parsed config specific to this op."""


class HasExecutionProperties(ABC):
class ContextHasExecutionProperties(ABC):
@property
@abstractmethod
def execution_properties(self) -> ExecutionProperties:
Expand All @@ -161,7 +161,9 @@ def __instancecheck__(cls, instance) -> bool:


class OpExecutionContext(
AbstractComputeExecutionContext, HasExecutionProperties, metaclass=OpExecutionContextMetaClass
AbstractComputeExecutionContext,
ContextHasExecutionProperties,
metaclass=OpExecutionContextMetaClass,
):
"""The ``context`` object that can be made available as the first argument to the function
used for computing an op or asset.
Expand Down Expand Up @@ -589,7 +591,7 @@ def retry_number(self) -> int:
return self._step_execution_context.previous_attempt_count

def describe_op(self) -> str:
return self.execution_info.step_description
return self.execution_properties.step_description

@public
def get_mapping_key(self) -> Optional[str]:
Expand Down
15 changes: 7 additions & 8 deletions python_modules/dagster/dagster/_core/execution/plan/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@
)
from dagster._core.events import DagsterEvent
from dagster._core.execution.context.compute import (
AssetExecutionContext,
OpExecutionContext,
ContextHasExecutionProperties,
)
from dagster._core.execution.context.system import StepExecutionContext
from dagster._core.system_config.objects import ResolvedRunConfig
Expand Down Expand Up @@ -154,7 +153,7 @@ def _yield_compute_results(
step_context: StepExecutionContext,
inputs: Mapping[str, Any],
compute_fn: OpComputeFunction,
compute_context: Union[OpExecutionContext, AssetExecutionContext],
compute_context: ContextHasExecutionProperties,
) -> Iterator[OpOutputUnion]:
user_event_generator = compute_fn(compute_context, inputs)

Expand All @@ -176,7 +175,7 @@ def _yield_compute_results(
if inspect.isasyncgen(user_event_generator):
user_event_generator = gen_from_async_gen(user_event_generator)

step_label = compute_context.execution_info.step_description
step_label = compute_context.execution_properties.step_description

for event in iterate_with_context(
lambda: op_execution_error_boundary(
Expand All @@ -189,19 +188,19 @@ def _yield_compute_results(
),
user_event_generator,
):
if compute_context.execution_info.op_execution_context.has_events():
yield from compute_context.execution_info.op_execution_context.consume_events()
if compute_context.execution_properties.op_execution_context.has_events():
yield from compute_context.execution_properties.op_execution_context.consume_events()
yield _validate_event(event, step_context)

if compute_context.execution_info.op_execution_context.has_events():
if compute_context.execution_properties.op_execution_context.has_events():
yield from compute_context.consume_events()


def execute_core_compute(
step_context: StepExecutionContext,
inputs: Mapping[str, Any],
compute_fn: OpComputeFunction,
compute_context: Union[OpExecutionContext, AssetExecutionContext],
compute_context: ContextHasExecutionProperties,
) -> Iterator[OpOutputUnion]:
"""Execute the user-specified compute for the op. Wrap in an error boundary and do
all relevant logging and metrics tracking.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
from dagster._utils import is_named_tuple_instance
from dagster._utils.warnings import disable_dagster_warnings

from ..context.compute import AssetExecutionContext, OpExecutionContext
from ..context.compute import (
ContextHasExecutionProperties,
OpExecutionContext,
)


def create_op_compute_wrapper(
Expand Down Expand Up @@ -137,7 +140,7 @@ def _coerce_op_compute_fn_to_iterator(

def _zip_and_iterate_op_result(
result: Any,
context: Union[OpExecutionContext, AssetExecutionContext],
context: ContextHasExecutionProperties,
output_defs: Sequence[OutputDefinition],
) -> Iterator[Tuple[int, Any, OutputDefinition]]:
# Filtering the expected output defs here is an unfortunate temporary solution to deal with the
Expand Down Expand Up @@ -166,7 +169,7 @@ def _zip_and_iterate_op_result(
# MaterializeResult.
def _filter_expected_output_defs(
result: Any,
context: Union[OpExecutionContext, AssetExecutionContext],
context: ContextHasExecutionProperties,
output_defs: Sequence[OutputDefinition],
) -> Sequence[OutputDefinition]:
result_tuple = (
Expand All @@ -184,7 +187,7 @@ def _filter_expected_output_defs(


def _validate_multi_return(
context: Union[OpExecutionContext, AssetExecutionContext],
context: ContextHasExecutionProperties,
result: Any,
output_defs: Sequence[OutputDefinition],
) -> Any:
Expand All @@ -200,7 +203,7 @@ def _validate_multi_return(
# When returning from an op with multiple outputs, the returned object must be a tuple of the same length as the number of outputs. At the time of the op's construction, we verify that a provided annotation is a tuple with the same length as the number of outputs, so if the result matches the number of output defs on the op, it will transitively also match the annotation.
if not isinstance(result, tuple):
raise DagsterInvariantViolationError(
f"{context.execution_info.step_description} has multiple outputs, but only one "
f"{context.execution_properties.step_description} has multiple outputs, but only one "
f"output was returned of type {type(result)}. When using "
"multiple outputs, either yield each output, or return a tuple "
"containing a value for each output. Check out the "
Expand All @@ -211,9 +214,9 @@ def _validate_multi_return(
if not len(output_tuple) == len(output_defs):
raise DagsterInvariantViolationError(
"Length mismatch between returned tuple of outputs and number of "
f"output defs on {context.execution_info.step_description}. Output tuple has "
f"output defs on {context.execution_properties.step_description}. Output tuple has "
f"{len(output_tuple)} outputs, while "
f"{context.execution_info.step_description} has {len(output_defs)} outputs."
f"{context.execution_properties.step_description} has {len(output_defs)} outputs."
)
return result

Expand Down Expand Up @@ -247,7 +250,7 @@ def _check_output_object_name(

def validate_and_coerce_op_result_to_iterator(
result: Any,
context: Union[AssetExecutionContext, OpExecutionContext],
context: ContextHasExecutionProperties,
output_defs: Sequence[OutputDefinition],
) -> Iterator[Any]:
if inspect.isgenerator(result):
Expand All @@ -256,7 +259,7 @@ def validate_and_coerce_op_result_to_iterator(
yield event
elif isinstance(result, (AssetMaterialization, ExpectationResult)):
raise DagsterInvariantViolationError(
f"Error in {context.execution_info.step_description}: If you are "
f"Error in {context.execution_properties.step_description}: If you are "
"returning an AssetMaterialization "
"or an ExpectationResult from "
"an op you must yield them "
Expand All @@ -269,8 +272,8 @@ def validate_and_coerce_op_result_to_iterator(
yield result
elif result is not None and not output_defs:
raise DagsterInvariantViolationError(
f"Error in {context.execution_info.step_description}: Unexpectedly returned output of type"
f" {type(result)}. {context.execution_info.step_description} is explicitly defined to"
f"Error in {context.execution_properties.step_description}: Unexpectedly returned output of type"
f" {type(result)}. {context.execution_properties.step_description} is explicitly defined to"
" return no results."
)
# `requires_typed_event_stream` is a mode where we require users to return/yield exactly the
Expand All @@ -295,15 +298,15 @@ def validate_and_coerce_op_result_to_iterator(
if output_def.is_dynamic:
if not isinstance(element, list):
raise DagsterInvariantViolationError(
f"Error with output for {context.execution_info.step_description}: "
f"Error with output for {context.execution_properties.step_description}: "
f"dynamic output '{output_def.name}' expected a list of "
"DynamicOutput objects, but instead received instead an "
f"object of type {type(element)}."
)
for item in element:
if not isinstance(item, DynamicOutput):
raise DagsterInvariantViolationError(
f"Error with output for {context.execution_info.step_description}: "
f"Error with output for {context.execution_properties.step_description}: "
f"dynamic output '{output_def.name}' at position {position} expected a "
"list of DynamicOutput objects, but received an "
f"item with type {type(item)}."
Expand All @@ -325,7 +328,7 @@ def validate_and_coerce_op_result_to_iterator(
annotation
):
raise DagsterInvariantViolationError(
f"Error with output for {context.execution_info.step_description}: received Output object for"
f"Error with output for {context.execution_properties.step_description}: received Output object for"
f" output '{output_def.name}' which does not have an Output annotation."
f" Annotation has type {annotation}."
)
Expand All @@ -343,15 +346,15 @@ def validate_and_coerce_op_result_to_iterator(
# output object was not received, throw an error.
if is_generic_output_annotation(annotation):
raise DagsterInvariantViolationError(
f"Error with output for {context.execution_info.step_description}: output "
f"Error with output for {context.execution_properties.step_description}: output "
f"'{output_def.name}' has generic output annotation, "
"but did not receive an Output object for this output. "
f"Received instead an object of type {type(element)}."
)
if result is None and output_def.is_required is False:
context.log.warning(
'Value "None" returned for non-required output '
f'"{output_def.name}" of {context.execution_info.step_description}. '
f'"{output_def.name}" of {context.execution_properties.step_description}. '
"This value will be passed to downstream "
f"{context.op_def.node_type_str}s. For conditional "
"execution, results must be yielded: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ def test_deprecation_warnings():
"is_subset",
"partition_keys",
"get",
"execution_info",
"_execution_info",
"execution_properties",
"_execution_props",
]

other_ignores = [
Expand Down

0 comments on commit 5e9f627

Please sign in to comment.