Skip to content

Commit

Permalink
include input loading time in step duration (dagster-io#21346)
Browse files Browse the repository at this point in the history
## Summary & Motivation

The step duration we report oddly included `handle_output` time, but not
`load_input` time. This PR makes it include both. This was reported by a
user.

## How I Tested These Changes
  • Loading branch information
sryza authored and nikomancy committed May 1, 2024
1 parent 9ad13c3 commit 1e60afe
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 34 deletions.
68 changes: 35 additions & 33 deletions python_modules/dagster/dagster/_core/execution/plan/execute_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,44 +471,46 @@ def core_dagster_event_sequence_for_step(
else:
yield DagsterEvent.step_start_event(step_context)

inputs = {}
with time_execution_scope() as timer_result, enter_execution_context(
step_context
) as compute_context:
inputs = {}

if step_context.is_sda_step:
step_context.fetch_external_input_asset_version_info()
if step_context.is_sda_step:
step_context.fetch_external_input_asset_version_info()

for step_input in step_context.step.step_inputs:
input_def = step_context.op_def.input_def_named(step_input.name)
dagster_type = input_def.dagster_type
for step_input in step_context.step.step_inputs:
input_def = step_context.op_def.input_def_named(step_input.name)
dagster_type = input_def.dagster_type

if dagster_type.is_nothing:
continue
if dagster_type.is_nothing:
continue

for event_or_input_value in step_input.source.load_input_object(step_context, input_def):
if isinstance(event_or_input_value, DagsterEvent):
yield event_or_input_value
else:
check.invariant(step_input.name not in inputs)
inputs[step_input.name] = event_or_input_value

for input_name, input_value in inputs.items():
for evt in check.generator(
_type_checked_event_sequence_for_input(step_context, input_name, input_value)
):
yield evt

# The core execution loop expects a compute generator in a specific format: a generator that
# takes a context and dictionary of inputs as input, yields output events. If an op definition
# was generated from the @op decorator, then compute_fn needs to be coerced
# into this format. If the op definition was created directly, then it is expected that the
# compute_fn is already in this format.
if isinstance(step_context.op_def.compute_fn, DecoratedOpFunction):
core_gen = create_op_compute_wrapper(step_context.op_def)
else:
core_gen = step_context.op_def.compute_fn
for event_or_input_value in step_input.source.load_input_object(
step_context, input_def
):
if isinstance(event_or_input_value, DagsterEvent):
yield event_or_input_value
else:
check.invariant(step_input.name not in inputs)
inputs[step_input.name] = event_or_input_value

for input_name, input_value in inputs.items():
for evt in check.generator(
_type_checked_event_sequence_for_input(step_context, input_name, input_value)
):
yield evt

# The core execution loop expects a compute generator in a specific format: a generator that
# takes a context and dictionary of inputs as input, yields output events. If an op definition
# was generated from the @op decorator, then compute_fn needs to be coerced
# into this format. If the op definition was created directly, then it is expected that the
# compute_fn is already in this format.
if isinstance(step_context.op_def.compute_fn, DecoratedOpFunction):
core_gen = create_op_compute_wrapper(step_context.op_def)
else:
core_gen = step_context.op_def.compute_fn

with time_execution_scope() as timer_result, enter_execution_context(
step_context
) as compute_context:
user_event_sequence = execute_core_compute(
step_context,
inputs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time

import pytest
from dagster import GraphDefinition, Output, op
from dagster import GraphDefinition, In, Output, input_manager, op


@pytest.mark.skipif(
Expand Down Expand Up @@ -49,3 +49,24 @@ def direct_return_op(_context):
result = job_def.execute_in_process()
success_event = result.get_step_success_events()[0]
assert success_event.event_specific_data.duration_ms >= 10.0


@pytest.mark.skipif(
sys.platform == "win32", reason="https://github.com/dagster-io/dagster/issues/1421"
)
def test_event_timing_include_input_loading():
@op(ins={"in1": In(input_manager_key="foo")})
def direct_return_op(in1):
return None

@input_manager
def my_input_manager():
time.sleep(0.01)
return None

job_def = GraphDefinition(node_defs=[direct_return_op], name="test").to_job(
resource_defs={"foo": my_input_manager}
)
result = job_def.execute_in_process()
success_event = result.get_step_success_events()[0]
assert success_event.event_specific_data.duration_ms >= 10.0

0 comments on commit 1e60afe

Please sign in to comment.