From 1e60afe4feae9a7e5827aca01869b63f259b4d83 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 22 Apr 2024 13:36:22 -0700 Subject: [PATCH] include input loading time in step duration (#21346) ## Summary & Motivation The step duration we report oddly included `handle_output` time, but not `load_input` time. This PR makes it include both. This was reported by a user. ## How I Tested These Changes --- .../_core/execution/plan/execute_step.py | 68 ++++++++++--------- .../execution_tests/test_timing.py | 23 ++++++- 2 files changed, 57 insertions(+), 34 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 0872e0e9a1883..5f4053efe28e3 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -471,44 +471,46 @@ def core_dagster_event_sequence_for_step( else: yield DagsterEvent.step_start_event(step_context) - inputs = {} + with time_execution_scope() as timer_result, enter_execution_context( + step_context + ) as compute_context: + inputs = {} - if step_context.is_sda_step: - step_context.fetch_external_input_asset_version_info() + if step_context.is_sda_step: + step_context.fetch_external_input_asset_version_info() - for step_input in step_context.step.step_inputs: - input_def = step_context.op_def.input_def_named(step_input.name) - dagster_type = input_def.dagster_type + for step_input in step_context.step.step_inputs: + input_def = step_context.op_def.input_def_named(step_input.name) + dagster_type = input_def.dagster_type - if dagster_type.is_nothing: - continue + if dagster_type.is_nothing: + continue - for event_or_input_value in step_input.source.load_input_object(step_context, input_def): - if isinstance(event_or_input_value, DagsterEvent): - yield event_or_input_value - else: - check.invariant(step_input.name not in inputs) - inputs[step_input.name] = event_or_input_value - - for input_name, input_value in inputs.items(): - for evt in check.generator( - _type_checked_event_sequence_for_input(step_context, input_name, input_value) - ): - yield evt - - # The core execution loop expects a compute generator in a specific format: a generator that - # takes a context and dictionary of inputs as input, yields output events. If an op definition - # was generated from the @op decorator, then compute_fn needs to be coerced - # into this format. If the op definition was created directly, then it is expected that the - # compute_fn is already in this format. - if isinstance(step_context.op_def.compute_fn, DecoratedOpFunction): - core_gen = create_op_compute_wrapper(step_context.op_def) - else: - core_gen = step_context.op_def.compute_fn + for event_or_input_value in step_input.source.load_input_object( + step_context, input_def + ): + if isinstance(event_or_input_value, DagsterEvent): + yield event_or_input_value + else: + check.invariant(step_input.name not in inputs) + inputs[step_input.name] = event_or_input_value + + for input_name, input_value in inputs.items(): + for evt in check.generator( + _type_checked_event_sequence_for_input(step_context, input_name, input_value) + ): + yield evt + + # The core execution loop expects a compute generator in a specific format: a generator that + # takes a context and dictionary of inputs as input, yields output events. If an op definition + # was generated from the @op decorator, then compute_fn needs to be coerced + # into this format. If the op definition was created directly, then it is expected that the + # compute_fn is already in this format. + if isinstance(step_context.op_def.compute_fn, DecoratedOpFunction): + core_gen = create_op_compute_wrapper(step_context.op_def) + else: + core_gen = step_context.op_def.compute_fn - with time_execution_scope() as timer_result, enter_execution_context( - step_context - ) as compute_context: user_event_sequence = execute_core_compute( step_context, inputs, diff --git a/python_modules/dagster/dagster_tests/execution_tests/test_timing.py b/python_modules/dagster/dagster_tests/execution_tests/test_timing.py index 8a60a2cfa5bef..ef1321a15e99c 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/test_timing.py +++ b/python_modules/dagster/dagster_tests/execution_tests/test_timing.py @@ -2,7 +2,7 @@ import time import pytest -from dagster import GraphDefinition, Output, op +from dagster import GraphDefinition, In, Output, input_manager, op @pytest.mark.skipif( @@ -49,3 +49,24 @@ def direct_return_op(_context): result = job_def.execute_in_process() success_event = result.get_step_success_events()[0] assert success_event.event_specific_data.duration_ms >= 10.0 + + +@pytest.mark.skipif( + sys.platform == "win32", reason="https://github.com/dagster-io/dagster/issues/1421" +) +def test_event_timing_include_input_loading(): + @op(ins={"in1": In(input_manager_key="foo")}) + def direct_return_op(in1): + return None + + @input_manager + def my_input_manager(): + time.sleep(0.01) + return None + + job_def = GraphDefinition(node_defs=[direct_return_op], name="test").to_job( + resource_defs={"foo": my_input_manager} + ) + result = job_def.execute_in_process() + success_event = result.get_step_success_events()[0] + assert success_event.event_specific_data.duration_ms >= 10.0