Skip to content

Commit

Permalink
support messaging events for concurrency keys (#17138)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Emit engine events to indicate that a step is blocked waiting for a
concurrency slot to open up.

## How I Tested These Changes
BK
  • Loading branch information
prha authored Oct 12, 2023
1 parent f1cb3f2 commit 1f0f9cf
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 0 deletions.
16 changes: 16 additions & 0 deletions python_modules/dagster/dagster/_core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,22 @@ def _msg():
message=_msg(),
)

@staticmethod
def step_concurrency_blocked(
step_context: IStepContext, concurrency_key: str, initial=True
) -> "DagsterEvent":
message = (
f"Step blocked by concurrency limit for key {concurrency_key}"
if initial
else f"Step still blocked by concurrency limit for key {concurrency_key}"
)
return DagsterEvent.from_step(
event_type=DagsterEventType.ENGINE_EVENT,
step_context=step_context,
message=message,
event_specific_data=EngineEventData(metadata={"concurrency_key": concurrency_key}),
)

@staticmethod
def job_start(job_context: IPlanContext) -> "DagsterEvent":
return DagsterEvent.from_job(
Expand Down
24 changes: 24 additions & 0 deletions python_modules/dagster/dagster/_core/execution/plan/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def _default_sort_key(step: ExecutionStep) -> float:


CONCURRENCY_CLAIM_BLOCKED_INTERVAL = 1
CONCURRENCY_CLAIM_MESSAGE_INTERVAL = 300


class ActiveExecution:
Expand Down Expand Up @@ -106,6 +107,7 @@ def __init__(
self._pending_retry: List[str] = []
self._pending_abandon: List[str] = []
self._waiting_to_retry: Dict[str, float] = {}
self._messaged_concurrency_slots: Dict[str, float] = {}

# then are considered _in_flight when vended via get_steps_to_*
self._in_flight: Set[str] = set()
Expand Down Expand Up @@ -637,3 +639,25 @@ def rebuild_from_events(
self.get_steps_to_execute()

return [self.get_step_by_key(step_key) for step_key in self._in_flight]

def concurrency_event_iterator(
self, plan_context: Union[PlanExecutionContext, PlanOrchestrationContext]
) -> Iterator[DagsterEvent]:
if not self._instance_concurrency_context:
return

pending_claims = self._instance_concurrency_context.pending_claim_steps()
for step_key in pending_claims:
last_messaged_timestamp = self._messaged_concurrency_slots.get(step_key)
if (
not last_messaged_timestamp
or time.time() - last_messaged_timestamp > CONCURRENCY_CLAIM_MESSAGE_INTERVAL
):
step = self.get_step_by_key(step_key)
step_context = plan_context.for_step(step)
step_concurrency_key = cast(str, step.tags.get(GLOBAL_CONCURRENCY_TAG))
self._messaged_concurrency_slots[step_key] = time.time()
is_initial_message = last_messaged_timestamp is None
yield DagsterEvent.step_concurrency_blocked(
step_context, step_concurrency_key, initial=is_initial_message
)
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def inner_plan_execution_iterator(
while not active_execution.is_complete:
step = active_execution.get_next_step()

yield from active_execution.concurrency_event_iterator(job_context)

if not step:
active_execution.sleep_til_ready()
continue
Expand Down
2 changes: 2 additions & 0 deletions python_modules/dagster/dagster/_core/executor/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ def execute(
if not steps:
break

yield from active_execution.concurrency_event_iterator(plan_context)

for step in steps:
step_context = plan_context.for_step(step)
term_events[step.key] = multiproc_ctx.Event()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut
else:
max_steps_to_run = None # disables limit

# process events from concurrency blocked steps
list(active_execution.concurrency_event_iterator(plan_context))

for step in active_execution.get_steps_to_execute(max_steps_to_run):
running_steps[step.key] = step
list(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,23 @@ def test_parallel_concurrency(instance, parallel_recon_job):
assert foo_info.assigned_step_count == 0


def _has_concurrency_blocked_event(events, concurrency_key):
message_str = f"blocked by concurrency limit for key {concurrency_key}"
for event in events:
if message_str in event.message:
return True
return False


def test_concurrency_blocked_events(instance, parallel_recon_job_not_inprocess):
instance.event_log_storage.set_concurrency_slots("foo", 1)
foo_info = instance.event_log_storage.get_concurrency_info("foo")
assert foo_info.slot_count == 1

with execute_job(parallel_recon_job_not_inprocess, instance=instance) as result:
assert _has_concurrency_blocked_event(result.all_events, "foo")


def test_error_concurrency(instance, error_recon_job):
instance.event_log_storage.set_concurrency_slots("foo", 1)
foo_info = instance.event_log_storage.get_concurrency_info("foo")
Expand Down

0 comments on commit 1f0f9cf

Please sign in to comment.