From d670828b0df322715ee2c0d750b9dc25add008ff Mon Sep 17 00:00:00 2001 From: prha <1040172+prha@users.noreply.github.com> Date: Thu, 12 Oct 2023 12:56:26 -0700 Subject: [PATCH] add default support for backing off of checking concurrency claims (#17109) ## Summary & Motivation Instead of checking every second, we can check every 10 seconds (backing off exponentially), to grab concurrency slots. Will relieve any db pressure just from checking contentious rows. Shouldn't affect starvation, since we still have priority checks. ## How I Tested These Changes BK --- .../plan/instance_concurrency_context.py | 27 +++++++++++--- .../test_instance_concurrency_context.py | 37 ++++++++++++++++--- 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/plan/instance_concurrency_context.py b/python_modules/dagster/dagster/_core/execution/plan/instance_concurrency_context.py index 0f1c8d1d6df30..d87d3c7639d10 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/instance_concurrency_context.py +++ b/python_modules/dagster/dagster/_core/execution/plan/instance_concurrency_context.py @@ -12,7 +12,9 @@ from dagster._core.instance import DagsterInstance -DEFAULT_CONCURRENCY_CLAIM_BLOCKED_INTERVAL = 1 +INITIAL_INTERVAL_VALUE = 1 +STEP_UP_BASE = 1.1 +MAX_CONCURRENCY_CLAIM_BLOCKED_INTERVAL = 15 class InstanceConcurrencyContext: @@ -34,6 +36,7 @@ def __init__(self, instance: DagsterInstance, run_id: str): self._run_id = run_id self._global_concurrency_keys = None self._pending_timeouts = defaultdict(float) + self._pending_claim_counts = defaultdict(int) self._pending_claims = set() self._claims = set() @@ -54,6 +57,7 @@ def __exit__( for step_key in to_clear: del self._pending_timeouts[step_key] + del self._pending_claim_counts[step_key] self._pending_claims.remove(step_key) self._context_guard = False @@ -89,12 +93,11 @@ def claim(self, concurrency_key: str, step_key: str, priority: int = 0): ) if not claim_status.is_claimed: - interval = ( - claim_status.sleep_interval - if claim_status.sleep_interval - else DEFAULT_CONCURRENCY_CLAIM_BLOCKED_INTERVAL + interval = _calculate_timeout_interval( + claim_status.sleep_interval, self._pending_claim_counts[step_key] ) self._pending_timeouts[step_key] = time.time() + interval + self._pending_claim_counts[step_key] += 1 return False if step_key in self._pending_claims: @@ -122,3 +125,17 @@ def free_step(self, step_key) -> None: self._instance.event_log_storage.free_concurrency_slot_for_step(self._run_id, step_key) self._claims.remove(step_key) + + +def _calculate_timeout_interval(sleep_interval: Optional[float], pending_claim_count: int) -> float: + if sleep_interval is not None: + return sleep_interval + + if pending_claim_count > 30: + # with the current values, we will always hit the max by the 30th claim attempt + return MAX_CONCURRENCY_CLAIM_BLOCKED_INTERVAL + + # increase the step up value exponentially, up to a max of 15 seconds (starting from 0) + step_up_value = STEP_UP_BASE**pending_claim_count - 1 + interval = INITIAL_INTERVAL_VALUE + step_up_value + return min(MAX_CONCURRENCY_CLAIM_BLOCKED_INTERVAL, interval) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py index c47f3607ec492..12f1d0f7e82fa 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py @@ -2,7 +2,8 @@ import pytest from dagster._core.execution.plan.instance_concurrency_context import ( - DEFAULT_CONCURRENCY_CLAIM_BLOCKED_INTERVAL, + INITIAL_INTERVAL_VALUE, + STEP_UP_BASE, InstanceConcurrencyContext, ) from dagster._core.utils import make_new_run_id @@ -82,11 +83,37 @@ def test_default_interval(concurrency_instance): # we have not waited long enough to query the db again assert concurrency_instance.event_log_storage.get_check_calls("b") == call_count - time.sleep(DEFAULT_CONCURRENCY_CLAIM_BLOCKED_INTERVAL) + time.sleep(INITIAL_INTERVAL_VALUE) context.claim("foo", "b") assert concurrency_instance.event_log_storage.get_check_calls("b") == call_count + 1 +def test_backoff_interval(concurrency_instance): + run_id = make_new_run_id() + concurrency_instance.event_log_storage.set_concurrency_slots("foo", 1) + + with InstanceConcurrencyContext(concurrency_instance, run_id) as context: + assert context.claim("foo", "a") + assert not context.claim("foo", "b") + call_count = concurrency_instance.event_log_storage.get_check_calls("b") + + context.claim("foo", "b") + # we have not waited long enough to query the db again + assert concurrency_instance.event_log_storage.get_check_calls("b") == call_count + + time.sleep(INITIAL_INTERVAL_VALUE) + context.claim("foo", "b") + assert concurrency_instance.event_log_storage.get_check_calls("b") == call_count + 1 + + # sleeping another second will not incur another check call, there's an exponential backoff + time.sleep(INITIAL_INTERVAL_VALUE) + context.claim("foo", "b") + assert concurrency_instance.event_log_storage.get_check_calls("b") == call_count + 1 + time.sleep(STEP_UP_BASE - INITIAL_INTERVAL_VALUE) + context.claim("foo", "b") + assert concurrency_instance.event_log_storage.get_check_calls("b") == call_count + 2 + + def test_custom_interval(concurrency_custom_sleep_instance): run_id = make_new_run_id() storage = concurrency_custom_sleep_instance.event_log_storage @@ -101,13 +128,13 @@ def test_custom_interval(concurrency_custom_sleep_instance): # we have not waited long enough to query the db again assert storage.get_check_calls("b") == call_count - assert DEFAULT_CONCURRENCY_CLAIM_BLOCKED_INTERVAL < CUSTOM_SLEEP_INTERVAL - time.sleep(DEFAULT_CONCURRENCY_CLAIM_BLOCKED_INTERVAL) + assert INITIAL_INTERVAL_VALUE < CUSTOM_SLEEP_INTERVAL + time.sleep(INITIAL_INTERVAL_VALUE) context.claim("foo", "b") # we have waited the default interval, but not the custom interval assert storage.get_check_calls("b") == call_count - interval_to_custom = CUSTOM_SLEEP_INTERVAL - DEFAULT_CONCURRENCY_CLAIM_BLOCKED_INTERVAL + interval_to_custom = CUSTOM_SLEEP_INTERVAL - INITIAL_INTERVAL_VALUE time.sleep(interval_to_custom) context.claim("foo", "b") assert storage.get_check_calls("b") == call_count + 1