Skip to content

Commit

Permalink
Add support for ECS External launch type (#16715)
Browse files Browse the repository at this point in the history
## Summary & Motivation
We are using Dagster in AWS using the ECS launcher. For certain tasks we
are looking to move to Dagster the software has to be run on machines
that are in our data center. We would like to be able to use the Dagster
with ECS using ECS Anywhere, but found when trying to launch tasks with
the External launch type Dagster would fail to start tasks (attached
error in screenshot). The error was created during the RunTask boto3
call since the awsvpc networkMode is only valid if the launch type is
"EC2" or "FARGATE". This change adds a check for the launch type before
applying the aws vpc network configuration settings.

## How I Tested These Changes
I changed the dagster-aws module to be pip installed from my git
repository and was able to run tasks on External instances.

<img width="1218" alt="networkMode_error"
src="https://github.com/dagster-io/dagster/assets/5329455/ba2e5de6-519c-4941-a7b1-d48b6e3404e4">

---------

Co-authored-by: gibsondan <[email protected]>
  • Loading branch information
cuttius and gibsondan authored Oct 9, 2023
1 parent 5674ab9 commit 38218ed
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 28 deletions.
56 changes: 28 additions & 28 deletions python_modules/libraries/dagster-aws/dagster_aws/ecs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,38 +357,38 @@ def get_task_kwargs_from_current_task(
cluster,
task,
):
enis = []
subnets = []
for attachment in task["attachments"]:
if attachment["type"] == "ElasticNetworkInterface":
for detail in attachment["details"]:
if detail["name"] == "subnetId":
subnets.append(detail["value"])
if detail["name"] == "networkInterfaceId":
enis.append(ec2.NetworkInterface(detail["value"]))

public_ip = False
security_groups = []
for eni in enis:
if (eni.association_attribute or {}).get("PublicIp"):
public_ip = True
for group in eni.groups:
security_groups.append(group["GroupId"])

run_task_kwargs = {
"cluster": cluster,
"networkConfiguration": {
"awsvpcConfiguration": {
"subnets": subnets,
"assignPublicIp": "ENABLED" if public_ip else "DISABLED",
"securityGroups": security_groups,
},
},
}
run_task_kwargs = {"cluster": cluster}

if not task.get("capacityProviderStrategy"):
run_task_kwargs["launchType"] = task.get("launchType") or "FARGATE"
else:
run_task_kwargs["capacityProviderStrategy"] = task.get("capacityProviderStrategy")

if run_task_kwargs["launchType"] != "EXTERNAL":
enis = []
subnets = []
for attachment in task["attachments"]:
if attachment["type"] == "ElasticNetworkInterface":
for detail in attachment["details"]:
if detail["name"] == "subnetId":
subnets.append(detail["value"])
if detail["name"] == "networkInterfaceId":
enis.append(ec2.NetworkInterface(detail["value"]))

public_ip = False
security_groups = []

for eni in enis:
if (eni.association_attribute or {}).get("PublicIp"):
public_ip = True
for group in eni.groups:
security_groups.append(group["GroupId"])

aws_vpc_config = {
"subnets": subnets,
"assignPublicIp": "ENABLED" if public_ip else "DISABLED",
"securityGroups": security_groups,
}
run_task_kwargs["networkConfiguration"] = {"awsvpcConfiguration": aws_vpc_config}

return run_task_kwargs
Original file line number Diff line number Diff line change
Expand Up @@ -1221,3 +1221,59 @@ def test_custom_launcher(
== WorkerStatus.RUNNING
)
ecs.stop_task(task=task_arn)


def test_external_launch_type(
ecs,
instance_cm,
workspace,
external_job,
job,
):
container_name = "external"

task_definition = ecs.register_task_definition(
family="external",
containerDefinitions=[{"name": container_name, "image": "dagster:first"}],
networkMode="bridge",
memory="512",
cpu="256",
)["taskDefinition"]

assert task_definition["networkMode"] == "bridge"

task_definition_arn = task_definition["taskDefinitionArn"]

# You can provide a family or a task definition ARN
with instance_cm(
{
"task_definition": task_definition_arn,
"container_name": container_name,
"run_task_kwargs": {
"launchType": "EXTERNAL",
},
}
) as instance:
run = instance.create_run_for_job(
job,
external_job_origin=external_job.get_external_origin(),
job_code_origin=external_job.get_python_origin(),
)

initial_task_definitions = ecs.list_task_definitions()["taskDefinitionArns"]
initial_tasks = ecs.list_tasks()["taskArns"]

instance.launch_run(run.run_id, workspace)

# A new task definition is not created
assert ecs.list_task_definitions()["taskDefinitionArns"] == initial_task_definitions

# A new task is launched
tasks = ecs.list_tasks()["taskArns"]

assert len(tasks) == len(initial_tasks) + 1
task_arn = next(iter(set(tasks).difference(initial_tasks)))
task = ecs.describe_tasks(tasks=[task_arn])["tasks"][0]

assert task["taskDefinitionArn"] == task_definition["taskDefinitionArn"]
assert task["launchType"] == "EXTERNAL"

0 comments on commit 38218ed

Please sign in to comment.