Skip to content

Commit

Permalink
[dagster-aws] correctly use image from task definition in ECSRunLaunc…
Browse files Browse the repository at this point in the history
…her (#26346)

## Summary & Motivation

Looks like #23337 introduced a regression by checking if the user has
set `DAGSTER_CURRENT_IMAGE` in the code location too early.

This image is not used when a task definition is provided directly by
the user (instead of generating one automatically), so this check was
too early.

I moved the check to the appropriate location, provided correct type
annotations for the `image` argument to prevent similar confusion in the
future, and improved the exception message to be more explicit.

Resolve #26138

## How I Tested These Changes

## Changelog

[dagster-aws] Fixed a bug in the ECSRunLauncher that prevented it
from accepting a user-provided task definition when
DAGSTER_CURRENT_IMAGE was not set in the code location.
  • Loading branch information
danielgafni authored Dec 9, 2024
1 parent 53a5ad5 commit 2f58726
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
12 changes: 8 additions & 4 deletions python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
StringSource,
_check as check,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.events import EngineEventData
from dagster._core.instance import T_DagsterInstance
from dagster._core.launcher.base import (
Expand Down Expand Up @@ -487,8 +488,6 @@ def launch_run(self, context: LaunchRunContext) -> None:
)
command = self._get_command_args(args, context)
image = self._get_image_for_run(context)
if image is None:
raise ValueError("Could not determine image for run")

run_task_kwargs = self._run_task_kwargs(run, image, container_context)

Expand Down Expand Up @@ -649,7 +648,7 @@ def _get_container_name(self, container_context: EcsContainerContext) -> str:
return container_context.container_name or self.container_name

def _run_task_kwargs(
self, run: DagsterRun, image: str, container_context: EcsContainerContext
self, run: DagsterRun, image: Optional[str], container_context: EcsContainerContext
) -> Dict[str, Any]:
"""Return a dictionary of args to launch the ECS task, registering a new task
definition if needed.
Expand All @@ -661,7 +660,7 @@ def _run_task_kwargs(

if container_context.task_definition_arn:
task_definition = container_context.task_definition_arn
else:
elif image is not None:
family = self._get_run_task_definition_family(run)

if self.task_definition_dict or not self.use_current_ecs_task_config:
Expand Down Expand Up @@ -755,6 +754,11 @@ def _run_task_kwargs(
)

task_definition = family
else:
# since image was not set, we cannot construct a task definition automatically
raise DagsterInvariantViolationError(
"Could not determine image to use for the run. It has to be provided in the code location: https://docs.dagster.io/concepts/code-locations/workspace-files#specifying-a-docker-image"
)

if self.use_current_ecs_task_config:
current_task_metadata = get_current_ecs_task_metadata()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def get_task_definition_dict_from_current_task(
ecs,
family,
current_task,
image,
image: str,
container_name,
environment,
command=None,
Expand Down

0 comments on commit 2f58726

Please sign in to comment.