Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KubeRay] support suspending worker groups in KubeRay autoscaler #49768

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,18 @@ def _node_type_from_group_spec(
if is_head:
# The head node type has no workers because the head is not a worker.
min_workers = max_workers = 0
suspend = False
else:
# `minReplicas` and `maxReplicas` are required fields for each workerGroupSpec
min_workers = group_spec["minReplicas"]
max_workers = group_spec["maxReplicas"]
suspend = group_spec.get("suspend", False)

resources = _get_ray_resources_from_group_spec(group_spec, is_head)

node_type = {
"min_workers": min_workers,
"max_workers": max_workers,
"min_workers": min_workers if not suspend else 0,
"max_workers": max_workers if not suspend else 0,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to let the autoscaler know that the suspended worker group should have no workers; otherwise, the autoscaler will keep trying to scale it up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the available_node_types definition does not provide similar functionality (suspending a node type), the best way to do that is to set its max_workers and min_workers to 0.

# `node_config` is a legacy field required for compatibility.
# Pod config data is required by the operator but not by the autoscaler.
"node_config": {},
Expand Down
26 changes: 26 additions & 0 deletions python/ray/tests/kuberay/test_autoscaling_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ def _get_basic_autoscaling_config() -> dict:
}


def _get_autoscaling_config_with_groups_suspended() -> dict:
"""The expected autoscaling with all groups suspended."""
config = _get_basic_autoscaling_config()
for _, spec in config["available_node_types"].items():
spec["max_workers"] = 0
spec["min_workers"] = 0
config["max_workers"] = 0
return config


def _get_ray_cr_no_cpu_error() -> dict:
"""Incorrectly formatted Ray CR without num-cpus rayStartParam and without resource
limits. Autoscaler should raise an error when reading this.
Expand Down Expand Up @@ -236,6 +246,14 @@ def _get_ray_cr_with_only_requests() -> dict:
return cr


def _get_ray_cr_with_groups_suspended() -> dict:
"""CR with all worker groups suspended"""
cr = get_basic_ray_cr()
for group in cr["spec"]["workerGroupSpecs"]:
group["suspend"] = True
return cr


def _get_autoscaling_config_with_options() -> dict:
config = _get_basic_autoscaling_config()
config["upscaling_speed"] = 1
Expand Down Expand Up @@ -312,6 +330,14 @@ def test_resource_quantity(input: str, output: int):
None,
id="autoscaler-options",
),
pytest.param(
_get_ray_cr_with_groups_suspended(),
_get_autoscaling_config_with_groups_suspended(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests both max_workers and min_workers are set to 0 in the generated autoscaling config.

None,
None,
None,
id="groups-suspended",
),
pytest.param(
_get_ray_cr_with_tpu_custom_resource(),
_get_basic_autoscaling_config(),
Expand Down
Loading