Skip to content

Commit

Permalink
ECS Executor WIP
Browse files Browse the repository at this point in the history
Summary:
Proving out the possibility of building out an ECS executor. It assumes that it is launched via the EcsRunLauncher and can use the same task definition of the task in which it is launched, but still allows customizing of memory,cpu, ephemeral storage, and whatever else you can override via run_task arguments.
  • Loading branch information
gibsondan authored and danielgafni committed Dec 4, 2024
1 parent c67d4ea commit 4b01ac0
Show file tree
Hide file tree
Showing 8 changed files with 407 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut
instance_concurrency_context=instance_concurrency_context,
) as active_execution:
running_steps: Dict[str, ExecutionStep] = {}
step_worker_handles: Dict[str, Optional[str]] = {}

if plan_context.resume_from_failure:
DagsterEvent.engine_event(
Expand Down Expand Up @@ -211,7 +212,8 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut

try:
health_check = self._step_handler.check_step_health(
step_handler_context
step_handler_context,
step_worker_handle=None,
)
except Exception:
# For now we assume that an exception indicates that the step should be resumed.
Expand All @@ -237,15 +239,14 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut

if should_retry_step:
# health check failed, launch the step
list(
self._step_handler.launch_step(
self._get_step_handler_context(
plan_context, [step], active_execution
)
self._step_handler.launch_step(
self._get_step_handler_context(
plan_context, [step], active_execution
)
)

running_steps[step.key] = step
step_worker_handles[step.key] = None

last_check_step_health_time = get_current_datetime()

Expand All @@ -262,13 +263,12 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut
"Executor received termination signal, forwarding to steps",
EngineEventData.interrupted(list(running_steps.keys())),
)
for step in running_steps.values():
list(
self._step_handler.terminate_step(
self._get_step_handler_context(
plan_context, [step], active_execution
)
)
for step_key, step in running_steps.items():
self._step_handler.terminate_step(
self._get_step_handler_context(
plan_context, [step], active_execution
),
step_worker_handle=step_worker_handles[step_key],
)
else:
DagsterEvent.engine_event(
Expand Down Expand Up @@ -311,6 +311,7 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut
):
assert isinstance(dagster_event.step_key, str)
del running_steps[dagster_event.step_key]
del step_worker_handles[dagster_event.step_key]

if not dagster_event.is_step_up_for_retry:
active_execution.verify_complete(
Expand All @@ -325,14 +326,15 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut
curr_time - last_check_step_health_time
).total_seconds() >= self._check_step_health_interval_seconds:
last_check_step_health_time = curr_time
for step in running_steps.values():
for step_key, step in running_steps.items():
step_context = plan_context.for_step(step)

try:
health_check_result = self._step_handler.check_step_health(
self._get_step_handler_context(
plan_context, [step], active_execution
)
),
step_worker_handle=step_worker_handles[step_key],
)
if not health_check_result.is_healthy:
health_check_error = SerializableErrorInfo(
Expand Down Expand Up @@ -374,11 +376,9 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut

for step in active_execution.get_steps_to_execute(max_steps_to_run):
running_steps[step.key] = step
list(
self._step_handler.launch_step(
self._get_step_handler_context(
plan_context, [step], active_execution
)
step_worker_handles[step.key] = self._step_handler.launch_step(
self._get_step_handler_context(
plan_context, [step], active_execution
)
)

Expand All @@ -398,12 +398,11 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut
error=serializable_error,
),
)
for step in running_steps.values():
list(
self._step_handler.terminate_step(
self._get_step_handler_context(
plan_context, [step], active_execution
)
)
for step_key, step in running_steps.items():
self._step_handler.terminate_step(
self._get_step_handler_context(
plan_context, [step], active_execution
),
step_worker_handle=step_worker_handles[step_key],
)
raise
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from abc import ABC, abstractmethod
from typing import Iterator, Mapping, NamedTuple, Optional, Sequence
from typing import Mapping, NamedTuple, Optional, Sequence

from dagster import (
DagsterInstance,
_check as check,
)
from dagster._core.events import DagsterEvent
from dagster._core.execution.context.system import IStepContext, PlanOrchestrationContext
from dagster._core.execution.plan.step import ExecutionStep
from dagster._core.storage.dagster_run import DagsterRun
Expand Down Expand Up @@ -83,13 +82,17 @@ def name(self) -> str:
pass

@abstractmethod
def launch_step(self, step_handler_context: StepHandlerContext) -> Iterator[DagsterEvent]:
def launch_step(self, step_handler_context: StepHandlerContext) -> Optional[str]:
pass

@abstractmethod
def check_step_health(self, step_handler_context: StepHandlerContext) -> CheckStepHealthResult:
def check_step_health(
self, step_handler_context: StepHandlerContext, step_worker_handle: Optional[str]
) -> CheckStepHealthResult:
pass

@abstractmethod
def terminate_step(self, step_handler_context: StepHandlerContext) -> Iterator[DagsterEvent]:
def terminate_step(
self, step_handler_context: StepHandlerContext, step_worker_handle: Optional[str]
) -> None:
pass
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from dagster_aws.ecs.executor import ecs_executor as ecs_executor
from dagster_aws.ecs.launcher import EcsRunLauncher as EcsRunLauncher
from dagster_aws.ecs.tasks import EcsEventualConsistencyTimeout as EcsEventualConsistencyTimeout
Loading

0 comments on commit 4b01ac0

Please sign in to comment.