Skip to content

Commit

Permalink
Launch efa instances from heterogenous reservations (#3768)
Browse files Browse the repository at this point in the history
  • Loading branch information
arjkesh authored Mar 27, 2024
1 parent 0eb3a42 commit 6adcb21
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 54 deletions.
5 changes: 1 addition & 4 deletions test/dlc_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ def ei_accelerator_type(request):
def efa_ec2_instances(
request,
ec2_client,
ec2_resource,
ec2_instance_type,
ec2_instance_role_name,
ec2_key_name,
Expand Down Expand Up @@ -400,16 +399,14 @@ def delete_ssh_keypair():
{"ResourceType": "instance", "Tags": [{"Key": "Name", "Value": instance_name_prefix}]}
],
}
response = ec2_utils.launch_efa_instances_with_retry(
instances = ec2_utils.launch_efa_instances_with_retry(
ec2_client,
ec2_instance_type,
availability_zone_options,
ec2_run_instances_definition,
fn_name=request.node.name,
)

instances = response["Instances"]

def terminate_efa_instances():
ec2_client.terminate_instances(
InstanceIds=[instance_info["InstanceId"] for instance_info in instances]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import os

import pytest

from packaging.version import Version
Expand All @@ -35,6 +37,10 @@ def can_run_pytorchddp(ecr_image):
return Version(image_framework_version) in SpecifierSet(">=1.10")


@pytest.mark.skipif(
os.getenv("SM_EFA_TEST_INSTANCE_TYPE") == "p5.48xlarge",
reason="Low availability of instance type; Must ensure test works on new instances.",
)
@pytest.mark.processor("gpu")
@pytest.mark.model("N/A")
@pytest.mark.multinode(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import os

import pytest

from packaging.version import Version
Expand All @@ -35,7 +37,10 @@ def can_run_pytorchddp(ecr_image):
return Version(image_framework_version) in SpecifierSet(">=1.10")


# Skip due to known issue: https://github.com/pytorch/pytorch/issues/99074
@pytest.mark.skipif(
os.getenv("SM_EFA_TEST_INSTANCE_TYPE") == "p5.48xlarge",
reason="Low availability of instance type; Must ensure test works on new instances.",
)
@pytest.mark.processor("gpu")
@pytest.mark.model("N/A")
@pytest.mark.multinode(2)
Expand Down
279 changes: 230 additions & 49 deletions test/test_utils/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import logging
import sys
import uuid
import copy

from collections import Counter

from inspect import signature

Expand Down Expand Up @@ -407,6 +410,218 @@ def launch_instances_with_retry(
return instances


def launch_efa(ec2_client, ec2_instance_type, ec2_run_instances_definition, availability_zone):
ec2_efa_run_instances_definition = copy.deepcopy(ec2_run_instances_definition)
ec2_efa_run_instances_definition.update(
{
"Placement": {"AvailabilityZone": availability_zone},
"NetworkInterfaces": generate_network_interfaces(
ec2_client, ec2_instance_type, availability_zone
),
}
)
response = ec2_client.run_instances(**ec2_efa_run_instances_definition) or {}
return response.get("Instances")


def launch_efa_with_reservations(
ec2_client, ec2_instance_type, reservations, ec2_run_instances_definition, fn_name=""
):
ec2_run_instances_reserved_definition = copy.deepcopy(ec2_run_instances_definition)
while reservations:
reservation = reservations.pop(0)
ec2_run_instances_reserved_definition["CapacityReservationSpecification"] = {
"CapacityReservationTarget": {
"CapacityReservationId": reservation["CapacityReservationId"]
}
}
try:
instances = launch_efa(
ec2_client,
ec2_instance_type,
ec2_run_instances_reserved_definition,
reservation["AvailabilityZone"],
)
if instances:
LOGGER.info(
f"Your EFA reservation is ready for {fn_name}, please wait to be seated. Launching..."
)
if is_mainline_context():
LOGGER.info(f"Launched EFA enabled instance for {fn_name} via {reservation}")
return instances
except ClientError as e:
LOGGER.debug(
f"Failed to launch EFA instance for {fn_name} from reservation due to {e}\n"
"Checking additional open reservations..."
)
return []


def validate_efa_instance_conditions(instances, minimum_number_of_instances):
if len(instances) == minimum_number_of_instances:
return True
if len(instances) > minimum_number_of_instances:
raise RuntimeError(
f"Launched too many instances somehow, raising and cleaning up - {instances}; min/max_allowed = {minimum_number_of_instances}"
)
return False


class HeterogenousReservationError(Exception):
pass


def referesh_capacity_reservations(ec2_client, ec2_instance_type, az):
reservations = [
reservation
for reservation in get_available_reservations(ec2_client, ec2_instance_type)
if reservation["AvailabilityZone"] == az
]

available_instances = sum(
[reservation["AvailableInstanceCount"] for reservation in reservations]
)

return reservations, available_instances


def launch_efa_with_heterogenous_reservations(ec2_client, ec2_run_instances_definition, fn_name=""):
"""
Launch efa instances with heterogenous reservations
Previous EFA launch code requires instances to be launched from the same command. This prohibits launching instances
from multiple capacity reservations if the reservation has less than the minimum available instances required (typically 2).
To remedy this, we group reservations by availability zone. If we have instances available in reservation, we
group by most common availability zone and try to launch multiple instances from reservation. If we do not meet our minimum
requirements, try launching from public pool to remedy the situation. If we launch 0 from reservation, do not
try launching from the public pool, and allow other functions to handle launching exclusively from public.
Args:
ec2_client (boto3.client): boto3 ec2 client
ec2_run_instances_definition (dict): key/value pairs for run instances launch cmd
fn_name (str, optional): pytest function name. Defaults to "".
Raises:
HeterogenousReservationError: Custom error handling for function failure
Returns:
list: launched instances
"""
ec2_heterogenous_run_instances_definition = copy.deepcopy(ec2_run_instances_definition)
ec2_instance_type = ec2_heterogenous_run_instances_definition["InstanceType"]
minimum_number_of_instances = ec2_heterogenous_run_instances_definition["MinCount"]

# Reset max and min count to 1; We will
ec2_heterogenous_run_instances_definition["MaxCount"] = 1
ec2_heterogenous_run_instances_definition["MinCount"] = 1

reserved_azs = [
reservation["AvailabilityZone"]
for reservation in ec2_client.describe_capacity_reservations()["CapacityReservations"]
if reservation["InstanceType"] == ec2_instance_type
]

tmp_reservations = get_available_reservations(
ec2_client=ec2_client,
instance_type=ec2_instance_type,
min_availability=ec2_heterogenous_run_instances_definition["MinCount"],
)

az_counter = Counter(reservation["AvailabilityZone"] for reservation in tmp_reservations)
az_priorities = [c[0] for c in az_counter.most_common()]

# Track all reserved availability zones, in case capacity comes later
for reserved_az in reserved_azs:
if reserved_az not in az_priorities:
az_priorities.append(reserved_az)

for az in az_priorities:
LOGGER.info(f"Checking AZ {az}")
# Refresh reservations for each AZ
reservations, available_instances = referesh_capacity_reservations(
ec2_client, ec2_instance_type, az
)
ec2_heterogenous_run_instances_definition["MaxCount"] = 1
ec2_heterogenous_run_instances_definition["MinCount"] = 1
instances = []
try:
while available_instances and len(instances) < minimum_number_of_instances:
LOGGER.info(f"trying to launch {ec2_instance_type} in {az}")
instance = launch_efa_with_reservations(
ec2_client=ec2_client,
ec2_instance_type=ec2_instance_type,
reservations=reservations,
ec2_run_instances_definition=ec2_heterogenous_run_instances_definition,
fn_name=fn_name,
)
instances += instance

# Refresh reservations for each AZ
reservations, available_instances = referesh_capacity_reservations(
ec2_client, ec2_instance_type, az
)

if validate_efa_instance_conditions(instances, minimum_number_of_instances):
LOGGER.info("Strung together some reservations, let's go")
return instances

# If we have remaining instances, try launching from public pool
# Try a different availability zone if we don't have any reservation launches, however. Always
# prioritize reservation launches in this function.
remaining_instances = minimum_number_of_instances - len(instances)
if remaining_instances != minimum_number_of_instances:
LOGGER.info(
f"Have {remaining_instances} remaining_instances instances in {az}. Trying from public pool."
)
ec2_heterogenous_run_instances_definition["MaxCount"] = remaining_instances
ec2_heterogenous_run_instances_definition["MinCount"] = remaining_instances
instances += launch_efa(
ec2_client, ec2_instance_type, ec2_heterogenous_run_instances_definition, az
)

if validate_efa_instance_conditions(instances, minimum_number_of_instances):
LOGGER.info("Strung together some reservations and some walk-ins, let's go")
return instances

# Clean up instances if this workflow did not succeed
LOGGER.info(
f"Failed to launch enough instances from public and reservations for {fn_name}."
)
if instances:
LOGGER.info(
f"Cleaning up instances {(instance['InstanceId'] for instance in instances)}..."
)
ec2_client.terminate_instances(
InstanceIds=[instance_info["InstanceId"] for instance_info in instances]
)

except ClientError as e:
# Clean up any remaining instances
LOGGER.info(
f"Failed to launch EFA instance for {fn_name} from reservation due to {e}\n"
"Checking additional open reservations and cleaning up stray resources"
)
if instances:
LOGGER.info(
f"Cleaning up instances {(instance['InstanceId'] for instance in instances)}..."
)
ec2_client.terminate_instances(
InstanceIds=[instance_info["InstanceId"] for instance_info in instances]
)

except Exception as e:
if instances:
LOGGER.info(
f"Cleaning up instances {(instance['InstanceId'] for instance in instances)}..."
)
ec2_client.terminate_instances(
InstanceIds=[instance_info["InstanceId"] for instance_info in instances]
)
raise HeterogenousReservationError("Failed to launch via heterogenous approach") from e
return []


@retry(
reraise=True,
stop=stop_after_delay(30 * 60), # Keep retrying for 30 minutes
Expand All @@ -429,75 +644,41 @@ def launch_efa_instances_with_retry(
:param fn_name: string - function name for ease of logging
:return: dict response from ec2_client.run_instances
"""
response = None
region = ec2_client.meta.region_name
reservations = get_available_reservations(
LOGGER.info(f"Trying to launch {ec2_instance_type} for {fn_name} via capacity reservation...")

heterogenous_reservation_launch = launch_efa_with_heterogenous_reservations(
ec2_client=ec2_client,
instance_type=ec2_run_instances_definition["InstanceType"],
min_availability=ec2_run_instances_definition["MinCount"],
ec2_run_instances_definition=ec2_run_instances_definition,
fn_name=fn_name,
)

# Try launching via reservation first
while reservations:
reservation = reservations.pop(0)
ec2_run_instances_definition["CapacityReservationSpecification"] = {
"CapacityReservationTarget": {
"CapacityReservationId": reservation["CapacityReservationId"]
}
}
ec2_run_instances_definition.update(
{
"Placement": {"AvailabilityZone": reservation["AvailabilityZone"]},
"NetworkInterfaces": generate_network_interfaces(
ec2_client, ec2_instance_type, reservation["AvailabilityZone"]
),
}
)
try:
response = ec2_client.run_instances(**ec2_run_instances_definition)
if response and response["Instances"]:
LOGGER.info(
f"Your EFA reservation is ready for {fn_name}, please wait to be seated. Launching..."
)
if is_mainline_context():
LOGGER.info(f"Launched EFA enabled instance for {fn_name} via {reservation}")
return response
except ClientError as e:
LOGGER.debug(
f"Failed to launch EFA instance for {fn_name} from reservation due to {e}\n"
"Checking additional open reservations..."
)
if heterogenous_reservation_launch:
return heterogenous_reservation_launch

# Clean up capacity reservation if it failed
ec2_run_instances_definition.pop("CapacityReservationSpecification", None)
LOGGER.info(
f"Looks like you didn't have an EFA reservation for {fn_name}, let's see if we can seat you as a walk-in..."
)

instances = []
for availability_zone in availability_zone_options:
ec2_run_instances_definition.update(
{
"Placement": {"AvailabilityZone": availability_zone},
"NetworkInterfaces": generate_network_interfaces(
ec2_client, ec2_instance_type, availability_zone
),
}
)
try:
response = ec2_client.run_instances(**ec2_run_instances_definition)
if response and response["Instances"]:
instances = launch_efa(
ec2_client, ec2_instance_type, ec2_run_instances_definition, availability_zone
)
if instances:
break
except ClientError as e:
LOGGER.debug(
LOGGER.info(
f"Failed to launch in {availability_zone} for {fn_name} due to {e}\n"
"Retrying in the next availability zone."
)
continue
if not (response and response["Instances"]):
if not instances:
raise RuntimeError(
f"Unable to launch {ec2_instance_type} instances in {region} for {fn_name}"
)
return response
return instances


def get_ec2_client(region):
Expand Down

0 comments on commit 6adcb21

Please sign in to comment.