diff --git a/python_modules/dagster/dagster/_core/execution/plan/active.py b/python_modules/dagster/dagster/_core/execution/plan/active.py index 7c9eda73fe644..fa02c3fe64bc5 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/active.py +++ b/python_modules/dagster/dagster/_core/execution/plan/active.py @@ -586,6 +586,10 @@ def is_complete(self) -> bool: def retry_state(self) -> RetryState: return self._retry_state + @property + def has_in_flight_steps(self) -> bool: + return len(self._in_flight) > 0 + def get_known_state(self) -> KnownExecutionState: return KnownExecutionState( previous_retry_attempts=self._retry_state.snapshot_attempts(), diff --git a/python_modules/dagster/dagster/_core/executor/step_delegating/step_delegating_executor.py b/python_modules/dagster/dagster/_core/executor/step_delegating/step_delegating_executor.py index 556d9566991b5..1b04aac74c57d 100644 --- a/python_modules/dagster/dagster/_core/executor/step_delegating/step_delegating_executor.py +++ b/python_modules/dagster/dagster/_core/executor/step_delegating/step_delegating_executor.py @@ -233,32 +233,35 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut return - for dagster_event in self._pop_events( - plan_context.instance, - plan_context.run_id, - ): - yield dagster_event - # STEP_SKIPPED events are only emitted by ActiveExecution, which already handles - # and yields them. - - if dagster_event.is_step_skipped: - assert isinstance(dagster_event.step_key, str) - active_execution.verify_complete(plan_context, dagster_event.step_key) - else: - active_execution.handle_event(dagster_event) - if ( - dagster_event.is_step_success - or dagster_event.is_step_failure - or dagster_event.is_resource_init_failure - or dagster_event.is_step_up_for_retry - ): + if active_execution.has_in_flight_steps: + for dagster_event in self._pop_events( + plan_context.instance, + plan_context.run_id, + ): + yield dagster_event + # STEP_SKIPPED events are only emitted by ActiveExecution, which already handles + # and yields them. + + if dagster_event.is_step_skipped: assert isinstance(dagster_event.step_key, str) - del running_steps[dagster_event.step_key] - - if not dagster_event.is_step_up_for_retry: - active_execution.verify_complete( - plan_context, dagster_event.step_key - ) + active_execution.verify_complete( + plan_context, dagster_event.step_key + ) + else: + active_execution.handle_event(dagster_event) + if ( + dagster_event.is_step_success + or dagster_event.is_step_failure + or dagster_event.is_resource_init_failure + or dagster_event.is_step_up_for_retry + ): + assert isinstance(dagster_event.step_key, str) + del running_steps[dagster_event.step_key] + + if not dagster_event.is_step_up_for_retry: + active_execution.verify_complete( + plan_context, dagster_event.step_key + ) # process skips from failures or uncovered inputs list(active_execution.plan_events_iterator(plan_context)) diff --git a/python_modules/dagster/dagster/_utils/test/__init__.py b/python_modules/dagster/dagster/_utils/test/__init__.py index b3e44ee79913d..1074c5abb1610 100644 --- a/python_modules/dagster/dagster/_utils/test/__init__.py +++ b/python_modules/dagster/dagster/_utils/test/__init__.py @@ -3,7 +3,7 @@ import tempfile from collections import defaultdict from contextlib import contextmanager -from typing import Any, Dict, List, Mapping, Optional, Type, cast +from typing import Any, Dict, List, Mapping, Optional, Set, Type, Union, cast # top-level include is dangerous in terms of incurring circular deps from dagster import ( @@ -26,6 +26,7 @@ from dagster._core.definitions.job_base import InMemoryJob from dagster._core.definitions.logger_definition import LoggerDefinition from dagster._core.definitions.resource_definition import ScopedResourcesBuilder +from dagster._core.events import DagsterEventType from dagster._core.execution.api import create_execution_plan from dagster._core.execution.context.system import PlanExecutionContext from dagster._core.execution.context_creation_job import ( @@ -39,6 +40,7 @@ from dagster._core.instance import DagsterInstance from dagster._core.scheduler import Scheduler from dagster._core.storage.dagster_run import DagsterRun +from dagster._core.storage.event_log.base import EventLogConnection from dagster._core.storage.event_log.sqlite.sqlite_event_log import SqliteEventLogStorage from dagster._core.storage.sqlite_storage import SqliteStorageConfig from dagster._core.utility_ops import create_stub_op @@ -301,6 +303,7 @@ def __init__( ): self._sleep_interval = sleep_interval self._check_calls = defaultdict(int) + self._records_for_run_calls = defaultdict(int) super().__init__(base_dir, inst_data) @classmethod @@ -317,9 +320,23 @@ def from_config_value( def supports_global_concurrency_limits(self) -> bool: return True + def get_records_for_run( + self, + run_id: str, + cursor: Optional[str] = None, + of_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None, + limit: Optional[int] = None, + ascending: bool = True, + ) -> EventLogConnection: + self._records_for_run_calls[run_id] = self._records_for_run_calls[run_id] + 1 + return super().get_records_for_run(run_id, cursor, of_type, limit, ascending) + def get_check_calls(self, step_key: str) -> int: return self._check_calls[step_key] + def get_records_for_run_calls(self, run_id: str) -> int: + return self._records_for_run_calls[run_id] + def check_concurrency_claim( self, concurrency_key: str, run_id: str, step_key: str ) -> ConcurrencyClaimStatus: diff --git a/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_step_delegating_executor.py b/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_step_delegating_executor.py index cfcd73fa78dbf..1e9634923ac71 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_step_delegating_executor.py +++ b/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_step_delegating_executor.py @@ -1,4 +1,6 @@ import subprocess +import tempfile +import threading import time import pytest @@ -30,6 +32,7 @@ StepDelegatingExecutor, StepHandler, ) +from dagster._core.storage.tags import GLOBAL_CONCURRENCY_TAG from dagster._core.test_utils import instance_for_test from dagster._utils.merger import merge_dicts @@ -456,3 +459,53 @@ def get_dynamic_op_failure_job(): def test_dynamic_failure_retry(job_fn, config_fn): TestStepHandler.reset() assert_expected_failure_behavior(job_fn, config_fn) + + +@op(tags={GLOBAL_CONCURRENCY_TAG: "foo"}) +def simple_op(context): + time.sleep(0.1) + foo_info = context.instance.event_log_storage.get_concurrency_info("foo") + return {"active": foo_info.active_slot_count, "pending": foo_info.pending_step_count} + + +@job(executor_def=test_step_delegating_executor) +def simple_job(): + simple_op() + + +def test_blocked_concurrency_limits(): + TestStepHandler.reset() + with tempfile.TemporaryDirectory() as temp_dir: + with instance_for_test( + temp_dir=temp_dir, + overrides={ + "event_log_storage": { + "module": "dagster.utils.test", + "class": "ConcurrencyEnabledSqliteTestEventLogStorage", + "config": {"base_dir": temp_dir}, + } + }, + ) as instance: + instance.event_log_storage.set_concurrency_slots("foo", 0) + + def _unblock_concurrency_key(instance, timeout): + time.sleep(timeout) + instance.event_log_storage.set_concurrency_slots("foo", 1) + + TIMEOUT = 3 + threading.Thread( + target=_unblock_concurrency_key, args=(instance, TIMEOUT), daemon=True + ).start() + with execute_job(reconstructable(simple_job), instance=instance) as result: + TestStepHandler.wait_for_processes() + assert result.success + assert any( + [ + "blocked by concurrency limit for key foo" in (event.message or "") + for event in result.all_events + ] + ) + # the executor loop sleeps every second, so there should be at least a call per + # second that the steps are blocked, in addition to the processing of any step + # events + assert instance.event_log_storage.get_records_for_run_calls(result.run_id) <= 3