Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Launch efa instances from heterogenous reservations #3768

Merged
merged 25 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions test/dlc_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ def ei_accelerator_type(request):
def efa_ec2_instances(
request,
ec2_client,
ec2_resource,
ec2_instance_type,
ec2_instance_role_name,
ec2_key_name,
Expand Down Expand Up @@ -401,16 +400,14 @@ def delete_ssh_keypair():
{"ResourceType": "instance", "Tags": [{"Key": "Name", "Value": instance_name_prefix}]}
],
}
response = ec2_utils.launch_efa_instances_with_retry(
instances = ec2_utils.launch_efa_instances_with_retry(
ec2_client,
ec2_instance_type,
availability_zone_options,
ec2_run_instances_definition,
fn_name=request.node.name,
)

instances = response["Instances"]

def terminate_efa_instances():
ec2_client.terminate_instances(
InstanceIds=[instance_info["InstanceId"] for instance_info in instances]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import os

import pytest

from packaging.version import Version
Expand All @@ -35,6 +37,10 @@ def can_run_pytorchddp(ecr_image):
return Version(image_framework_version) in SpecifierSet(">=1.10")


@pytest.mark.skipif(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove these conditional skips if we still want to run these

os.getenv("SM_EFA_TEST_INSTANCE_TYPE") == "p5.48xlarge",
reason="Low availability of instance type; Must ensure test works on new instances.",
)
@pytest.mark.processor("gpu")
@pytest.mark.model("N/A")
@pytest.mark.multinode(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import os

import pytest

from packaging.version import Version
Expand All @@ -35,7 +37,10 @@ def can_run_pytorchddp(ecr_image):
return Version(image_framework_version) in SpecifierSet(">=1.10")


# Skip due to known issue: https://github.com/pytorch/pytorch/issues/99074
@pytest.mark.skipif(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove these conditional skips if we still want to run these

os.getenv("SM_EFA_TEST_INSTANCE_TYPE") == "p5.48xlarge",
reason="Low availability of instance type; Must ensure test works on new instances.",
)
@pytest.mark.processor("gpu")
@pytest.mark.model("N/A")
@pytest.mark.multinode(2)
Expand Down
279 changes: 230 additions & 49 deletions test/test_utils/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import logging
import sys
import uuid
import copy

from collections import Counter

from inspect import signature

Expand Down Expand Up @@ -406,6 +409,218 @@ def launch_instances_with_retry(
return instances


def launch_efa(ec2_client, ec2_instance_type, ec2_run_instances_definition, availability_zone):
ec2_efa_run_instances_definition = copy.deepcopy(ec2_run_instances_definition)
ec2_efa_run_instances_definition.update(
{
"Placement": {"AvailabilityZone": availability_zone},
"NetworkInterfaces": generate_network_interfaces(
ec2_client, ec2_instance_type, availability_zone
),
}
)
response = ec2_client.run_instances(**ec2_efa_run_instances_definition) or {}
return response.get("Instances")


def launch_efa_with_reservations(
ec2_client, ec2_instance_type, reservations, ec2_run_instances_definition, fn_name=""
):
ec2_run_instances_reserved_definition = copy.deepcopy(ec2_run_instances_definition)
while reservations:
reservation = reservations.pop(0)
ec2_run_instances_reserved_definition["CapacityReservationSpecification"] = {
"CapacityReservationTarget": {
"CapacityReservationId": reservation["CapacityReservationId"]
}
}
try:
instances = launch_efa(
ec2_client,
ec2_instance_type,
ec2_run_instances_reserved_definition,
reservation["AvailabilityZone"],
)
if instances:
LOGGER.info(
f"Your EFA reservation is ready for {fn_name}, please wait to be seated. Launching..."
)
if is_mainline_context():
LOGGER.info(f"Launched EFA enabled instance for {fn_name} via {reservation}")
return instances
except ClientError as e:
LOGGER.debug(
f"Failed to launch EFA instance for {fn_name} from reservation due to {e}\n"
"Checking additional open reservations..."
)
return []


def validate_efa_instance_conditions(instances, minimum_number_of_instances):
if len(instances) == minimum_number_of_instances:
return True
if len(instances) > minimum_number_of_instances:
raise RuntimeError(
f"Launched too many instances somehow, raising and cleaning up - {instances}; min/max_allowed = {minimum_number_of_instances}"
)
return False


class HeterogenousReservationError(Exception):
pass


def referesh_capacity_reservations(ec2_client, ec2_instance_type, az):
reservations = [
reservation
for reservation in get_available_reservations(ec2_client, ec2_instance_type)
if reservation["AvailabilityZone"] == az
]

available_instances = sum(
[reservation["AvailableInstanceCount"] for reservation in reservations]
)

return reservations, available_instances


def launch_efa_with_heterogenous_reservations(ec2_client, ec2_run_instances_definition, fn_name=""):
"""
Launch efa instances with heterogenous reservations

Previous EFA launch code requires instances to be launched from the same command. This prohibits launching instances
from multiple capacity reservations if the reservation has less than the minimum available instances required (typically 2).

To remedy this, we group reservations by availability zone. If we have instances available in reservation, we
group by most common availability zone and try to launch multiple instances from reservation. If we do not meet our minimum
requirements, try launching from public pool to remedy the situation. If we launch 0 from reservation, do not
try launching from the public pool, and allow other functions to handle launching exclusively from public.

Args:
ec2_client (boto3.client): boto3 ec2 client
ec2_run_instances_definition (dict): key/value pairs for run instances launch cmd
fn_name (str, optional): pytest function name. Defaults to "".

Raises:
HeterogenousReservationError: Custom error handling for function failure

Returns:
list: launched instances
"""
ec2_heterogenous_run_instances_definition = copy.deepcopy(ec2_run_instances_definition)
ec2_instance_type = ec2_heterogenous_run_instances_definition["InstanceType"]
minimum_number_of_instances = ec2_heterogenous_run_instances_definition["MinCount"]

# Reset max and min count to 1; We will
ec2_heterogenous_run_instances_definition["MaxCount"] = 1
ec2_heterogenous_run_instances_definition["MinCount"] = 1

reserved_azs = [
reservation["AvailabilityZone"]
for reservation in ec2_client.describe_capacity_reservations()["CapacityReservations"]
if reservation["InstanceType"] == ec2_instance_type
]

tmp_reservations = get_available_reservations(
ec2_client=ec2_client,
instance_type=ec2_instance_type,
min_availability=ec2_heterogenous_run_instances_definition["MinCount"],
)

az_counter = Counter(reservation["AvailabilityZone"] for reservation in tmp_reservations)
az_priorities = [c[0] for c in az_counter.most_common()]

# Track all reserved availability zones, in case capacity comes later
for reserved_az in reserved_azs:
if reserved_az not in az_priorities:
az_priorities.append(reserved_az)

for az in az_priorities:
LOGGER.info(f"Checking AZ {az}")
# Refresh reservations for each AZ
reservations, available_instances = referesh_capacity_reservations(
ec2_client, ec2_instance_type, az
)
ec2_heterogenous_run_instances_definition["MaxCount"] = 1
ec2_heterogenous_run_instances_definition["MinCount"] = 1
instances = []
try:
while available_instances and len(instances) < minimum_number_of_instances:
arjkesh marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.info(f"trying to launch {ec2_instance_type} in {az}")
instance = launch_efa_with_reservations(
ec2_client=ec2_client,
ec2_instance_type=ec2_instance_type,
reservations=reservations,
ec2_run_instances_definition=ec2_heterogenous_run_instances_definition,
fn_name=fn_name,
)
instances += instance

# Refresh reservations for each AZ
reservations, available_instances = referesh_capacity_reservations(
ec2_client, ec2_instance_type, az
)

if validate_efa_instance_conditions(instances, minimum_number_of_instances):
LOGGER.info("Strung together some reservations, let's go")
return instances

# If we have remaining instances, try launching from public pool
# Try a different availability zone if we don't have any reservation launches, however. Always
# prioritize reservation launches in this function.
remaining_instances = minimum_number_of_instances - len(instances)
if remaining_instances != minimum_number_of_instances:
LOGGER.info(
f"Have {remaining_instances} remaining_instances instances in {az}. Trying from public pool."
)
ec2_heterogenous_run_instances_definition["MaxCount"] = remaining_instances
ec2_heterogenous_run_instances_definition["MinCount"] = remaining_instances
instances += launch_efa(
ec2_client, ec2_instance_type, ec2_heterogenous_run_instances_definition, az
)

if validate_efa_instance_conditions(instances, minimum_number_of_instances):
LOGGER.info("Strung together some reservations and some walk-ins, let's go")
return instances

# Clean up instances if this workflow did not succeed
LOGGER.info(
f"Failed to launch enough instances from public and reservations for {fn_name}."
)
if instances:
LOGGER.info(
f"Cleaning up instances {(instance['InstanceId'] for instance in instances)}..."
)
ec2_client.terminate_instances(
InstanceIds=[instance_info["InstanceId"] for instance_info in instances]
)

except ClientError as e:
# Clean up any remaining instances
LOGGER.info(
f"Failed to launch EFA instance for {fn_name} from reservation due to {e}\n"
"Checking additional open reservations and cleaning up stray resources"
)
if instances:
LOGGER.info(
f"Cleaning up instances {(instance['InstanceId'] for instance in instances)}..."
)
ec2_client.terminate_instances(
InstanceIds=[instance_info["InstanceId"] for instance_info in instances]
)

except Exception as e:
if instances:
LOGGER.info(
f"Cleaning up instances {(instance['InstanceId'] for instance in instances)}..."
)
ec2_client.terminate_instances(
InstanceIds=[instance_info["InstanceId"] for instance_info in instances]
)
raise HeterogenousReservationError("Failed to launch via heterogenous approach") from e
return []


@retry(
reraise=True,
stop=stop_after_delay(30 * 60), # Keep retrying for 30 minutes
Expand All @@ -428,75 +643,41 @@ def launch_efa_instances_with_retry(
:param fn_name: string - function name for ease of logging
:return: dict response from ec2_client.run_instances
"""
response = None
region = ec2_client.meta.region_name
reservations = get_available_reservations(
LOGGER.info(f"Trying to launch {ec2_instance_type} for {fn_name} via capacity reservation...")

heterogenous_reservation_launch = launch_efa_with_heterogenous_reservations(
ec2_client=ec2_client,
instance_type=ec2_run_instances_definition["InstanceType"],
min_availability=ec2_run_instances_definition["MinCount"],
ec2_run_instances_definition=ec2_run_instances_definition,
fn_name=fn_name,
)

# Try launching via reservation first
while reservations:
reservation = reservations.pop(0)
ec2_run_instances_definition["CapacityReservationSpecification"] = {
"CapacityReservationTarget": {
"CapacityReservationId": reservation["CapacityReservationId"]
}
}
ec2_run_instances_definition.update(
{
"Placement": {"AvailabilityZone": reservation["AvailabilityZone"]},
"NetworkInterfaces": generate_network_interfaces(
ec2_client, ec2_instance_type, reservation["AvailabilityZone"]
),
}
)
try:
response = ec2_client.run_instances(**ec2_run_instances_definition)
if response and response["Instances"]:
LOGGER.info(
f"Your EFA reservation is ready for {fn_name}, please wait to be seated. Launching..."
)
if is_mainline_context():
LOGGER.info(f"Launched EFA enabled instance for {fn_name} via {reservation}")
return response
except ClientError as e:
LOGGER.debug(
f"Failed to launch EFA instance for {fn_name} from reservation due to {e}\n"
"Checking additional open reservations..."
)
if heterogenous_reservation_launch:
return heterogenous_reservation_launch

# Clean up capacity reservation if it failed
ec2_run_instances_definition.pop("CapacityReservationSpecification", None)
LOGGER.info(
f"Looks like you didn't have an EFA reservation for {fn_name}, let's see if we can seat you as a walk-in..."
)

instances = []
for availability_zone in availability_zone_options:
ec2_run_instances_definition.update(
{
"Placement": {"AvailabilityZone": availability_zone},
"NetworkInterfaces": generate_network_interfaces(
ec2_client, ec2_instance_type, availability_zone
),
}
)
try:
response = ec2_client.run_instances(**ec2_run_instances_definition)
if response and response["Instances"]:
instances = launch_efa(
ec2_client, ec2_instance_type, ec2_run_instances_definition, availability_zone
)
if instances:
break
except ClientError as e:
LOGGER.debug(
LOGGER.info(
f"Failed to launch in {availability_zone} for {fn_name} due to {e}\n"
"Retrying in the next availability zone."
)
continue
if not (response and response["Instances"]):
if not instances:
raise RuntimeError(
f"Unable to launch {ec2_instance_type} instances in {region} for {fn_name}"
)
return response
return instances


def get_ec2_client(region):
Expand Down
Loading