Skip to content

Commit

Permalink
RFC: change step delegating executor to defer event_log queries when …
Browse files Browse the repository at this point in the history
…no steps are in flight (#19208)

## Summary & Motivation
Changes the step-delegating executor to defer event log queries when
there are no in-progress steps

## How I Tested These Changes
BK
  • Loading branch information
prha authored Feb 6, 2024
1 parent 7327390 commit 9481478
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 26 deletions.
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_core/execution/plan/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,10 @@ def is_complete(self) -> bool:
def retry_state(self) -> RetryState:
return self._retry_state

@property
def has_in_flight_steps(self) -> bool:
return len(self._in_flight) > 0

def get_known_state(self) -> KnownExecutionState:
return KnownExecutionState(
previous_retry_attempts=self._retry_state.snapshot_attempts(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,32 +233,35 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut

return

for dagster_event in self._pop_events(
plan_context.instance,
plan_context.run_id,
):
yield dagster_event
# STEP_SKIPPED events are only emitted by ActiveExecution, which already handles
# and yields them.

if dagster_event.is_step_skipped:
assert isinstance(dagster_event.step_key, str)
active_execution.verify_complete(plan_context, dagster_event.step_key)
else:
active_execution.handle_event(dagster_event)
if (
dagster_event.is_step_success
or dagster_event.is_step_failure
or dagster_event.is_resource_init_failure
or dagster_event.is_step_up_for_retry
):
if active_execution.has_in_flight_steps:
for dagster_event in self._pop_events(
plan_context.instance,
plan_context.run_id,
):
yield dagster_event
# STEP_SKIPPED events are only emitted by ActiveExecution, which already handles
# and yields them.

if dagster_event.is_step_skipped:
assert isinstance(dagster_event.step_key, str)
del running_steps[dagster_event.step_key]

if not dagster_event.is_step_up_for_retry:
active_execution.verify_complete(
plan_context, dagster_event.step_key
)
active_execution.verify_complete(
plan_context, dagster_event.step_key
)
else:
active_execution.handle_event(dagster_event)
if (
dagster_event.is_step_success
or dagster_event.is_step_failure
or dagster_event.is_resource_init_failure
or dagster_event.is_step_up_for_retry
):
assert isinstance(dagster_event.step_key, str)
del running_steps[dagster_event.step_key]

if not dagster_event.is_step_up_for_retry:
active_execution.verify_complete(
plan_context, dagster_event.step_key
)

# process skips from failures or uncovered inputs
list(active_execution.plan_events_iterator(plan_context))
Expand Down
19 changes: 18 additions & 1 deletion python_modules/dagster/dagster/_utils/test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import tempfile
from collections import defaultdict
from contextlib import contextmanager
from typing import Any, Dict, List, Mapping, Optional, Type, cast
from typing import Any, Dict, List, Mapping, Optional, Set, Type, Union, cast

# top-level include is dangerous in terms of incurring circular deps
from dagster import (
Expand All @@ -26,6 +26,7 @@
from dagster._core.definitions.job_base import InMemoryJob
from dagster._core.definitions.logger_definition import LoggerDefinition
from dagster._core.definitions.resource_definition import ScopedResourcesBuilder
from dagster._core.events import DagsterEventType
from dagster._core.execution.api import create_execution_plan
from dagster._core.execution.context.system import PlanExecutionContext
from dagster._core.execution.context_creation_job import (
Expand All @@ -39,6 +40,7 @@
from dagster._core.instance import DagsterInstance
from dagster._core.scheduler import Scheduler
from dagster._core.storage.dagster_run import DagsterRun
from dagster._core.storage.event_log.base import EventLogConnection
from dagster._core.storage.event_log.sqlite.sqlite_event_log import SqliteEventLogStorage
from dagster._core.storage.sqlite_storage import SqliteStorageConfig
from dagster._core.utility_ops import create_stub_op
Expand Down Expand Up @@ -301,6 +303,7 @@ def __init__(
):
self._sleep_interval = sleep_interval
self._check_calls = defaultdict(int)
self._records_for_run_calls = defaultdict(int)
super().__init__(base_dir, inst_data)

@classmethod
Expand All @@ -317,9 +320,23 @@ def from_config_value(
def supports_global_concurrency_limits(self) -> bool:
return True

def get_records_for_run(
self,
run_id: str,
cursor: Optional[str] = None,
of_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None,
limit: Optional[int] = None,
ascending: bool = True,
) -> EventLogConnection:
self._records_for_run_calls[run_id] = self._records_for_run_calls[run_id] + 1
return super().get_records_for_run(run_id, cursor, of_type, limit, ascending)

def get_check_calls(self, step_key: str) -> int:
return self._check_calls[step_key]

def get_records_for_run_calls(self, run_id: str) -> int:
return self._records_for_run_calls[run_id]

def check_concurrency_claim(
self, concurrency_key: str, run_id: str, step_key: str
) -> ConcurrencyClaimStatus:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import subprocess
import tempfile
import threading
import time

import pytest
Expand Down Expand Up @@ -30,6 +32,7 @@
StepDelegatingExecutor,
StepHandler,
)
from dagster._core.storage.tags import GLOBAL_CONCURRENCY_TAG
from dagster._core.test_utils import instance_for_test
from dagster._utils.merger import merge_dicts

Expand Down Expand Up @@ -456,3 +459,53 @@ def get_dynamic_op_failure_job():
def test_dynamic_failure_retry(job_fn, config_fn):
TestStepHandler.reset()
assert_expected_failure_behavior(job_fn, config_fn)


@op(tags={GLOBAL_CONCURRENCY_TAG: "foo"})
def simple_op(context):
time.sleep(0.1)
foo_info = context.instance.event_log_storage.get_concurrency_info("foo")
return {"active": foo_info.active_slot_count, "pending": foo_info.pending_step_count}


@job(executor_def=test_step_delegating_executor)
def simple_job():
simple_op()


def test_blocked_concurrency_limits():
TestStepHandler.reset()
with tempfile.TemporaryDirectory() as temp_dir:
with instance_for_test(
temp_dir=temp_dir,
overrides={
"event_log_storage": {
"module": "dagster.utils.test",
"class": "ConcurrencyEnabledSqliteTestEventLogStorage",
"config": {"base_dir": temp_dir},
}
},
) as instance:
instance.event_log_storage.set_concurrency_slots("foo", 0)

def _unblock_concurrency_key(instance, timeout):
time.sleep(timeout)
instance.event_log_storage.set_concurrency_slots("foo", 1)

TIMEOUT = 3
threading.Thread(
target=_unblock_concurrency_key, args=(instance, TIMEOUT), daemon=True
).start()
with execute_job(reconstructable(simple_job), instance=instance) as result:
TestStepHandler.wait_for_processes()
assert result.success
assert any(
[
"blocked by concurrency limit for key foo" in (event.message or "")
for event in result.all_events
]
)
# the executor loop sleeps every second, so there should be at least a call per
# second that the steps are blocked, in addition to the processing of any step
# events
assert instance.event_log_storage.get_records_for_run_calls(result.run_id) <= 3

0 comments on commit 9481478

Please sign in to comment.