Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7202: Use custom resources for Ray #7205

Merged
merged 7 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions modin/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@
ProgressBar,
RangePartitioning,
RangePartitioningGroupby,
RayInitCustomResources,
RayRedisAddress,
RayRedisPassword,
RayTaskCustomResources,
ReadSqlEngine,
StorageFormat,
TestDatasetSize,
Expand All @@ -75,6 +77,8 @@
"IsRayCluster",
"RayRedisAddress",
"RayRedisPassword",
"RayInitCustomResources",
"RayTaskCustomResources",
"LazyExecution",
# Dask specific
"DaskThreadsPerWorker",
Expand Down
43 changes: 43 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,49 @@ class RayRedisPassword(EnvironmentVariable, type=ExactStr):
default = secrets.token_hex(32)


class RayInitCustomResources(EnvironmentVariable, type=dict):
"""
Ray node's custom resources to initialize with.

Visit Ray documentation for more details:
https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources

Notes
-----
Relying on Modin to initialize Ray, you should set this config
for the proper initialization with custom resources.
"""

varname = "MODIN_RAY_INIT_CUSTOM_RESOURCES"
default = None


class RayTaskCustomResources(EnvironmentVariable, type=dict):
"""
Ray node's custom resources to request them in tasks or actors.

Visit Ray documentation for more details:
https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources

Notes
-----
You can use this config to limit the parallelism for the entire workflow
by setting the config at the very beginning.
>>> import modin.config as cfg
>>> cfg.RayTaskCustomResources.put({"special_hardware": 0.001})
This way each single remote task or actor will require 0.001 of "special_hardware" to run.
You can also use this config to limit the parallelism for a certain operation
by setting the config with context.
>>> with context(RayTaskCustomResources={"special_hardware": 0.001}):
... df.<op>
Comment on lines +331 to +332
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good that there is now an option to limit concurrency, but this only works for Ray. Let's create an issue for the rest of the engines.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config is not generic but only specific to Ray. I am not sure if there is a way to limit concurrency for other engines. I think if we will want to have something similar for other engines, we will open an issue and explore options if they are there. Do you still think we should create an issue now?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Limiting concurrency in context looks like a good feature for an advanced user. We can create a low priority issue, but now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way each single remote task or actor will require 0.001 of "special_hardware" to run
within the context only.
"""

varname = "MODIN_RAY_TASK_CUSTOM_RESOURCES"
default = None


class CpuCount(EnvironmentVariable, type=int):
"""How many CPU cores to use during initialization of the Modin engine."""

Expand Down
17 changes: 10 additions & 7 deletions modin/core/execution/ray/common/deferred_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from ray._private.services import get_node_ip_address
from ray.util.client.common import ClientObjectRef

from modin.config import RayTaskCustomResources
from modin.core.execution.ray.common import MaterializationHook, RayWrapper
from modin.logging import get_logger

Expand Down Expand Up @@ -155,9 +156,9 @@ def exec(
and self.flat_kwargs
and self.num_returns == 1
):
result, length, width, ip = remote_exec_func.remote(
self.func, self.data, *self.args, **self.kwargs
)
result, length, width, ip = remote_exec_func.options(
resources=RayTaskCustomResources.get()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should just call it RayTaskResources? (the same for RayInitResources) Since this config is used to pass values to resources.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to be explicit here as Ray itself calls it as custom resources - https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources.

).remote(self.func, self.data, *self.args, **self.kwargs)
meta = MetaList([length, width, ip])
self._set_result(result, meta, 0)
return result, meta, 0
Expand Down Expand Up @@ -435,11 +436,13 @@ def _remote_exec_chain(num_returns: int, *args: Tuple) -> List[Any]:
# Prefer _remote_exec_single_chain(). It has fewer arguments and
# does not require the num_returns to be specified in options.
if num_returns == 2:
return _remote_exec_single_chain.remote(*args)
return _remote_exec_single_chain.options(
resources=RayTaskCustomResources.get()
).remote(*args)
else:
return _remote_exec_multi_chain.options(num_returns=num_returns).remote(
num_returns, *args
)
return _remote_exec_multi_chain.options(
num_returns=num_returns, resources=RayTaskCustomResources.get()
).remote(num_returns, *args)

def _set_result(
self,
Expand Down
7 changes: 4 additions & 3 deletions modin/core/execution/ray/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import ray
from ray.util.client.common import ClientObjectRef

from modin.config import RayTaskCustomResources
from modin.error_message import ErrorMessage


Expand Down Expand Up @@ -78,9 +79,9 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
"""
args = [] if f_args is None else f_args
kwargs = {} if f_kwargs is None else f_kwargs
return _deploy_ray_func.options(num_returns=num_returns).remote(
func, *args, **kwargs
)
return _deploy_ray_func.options(
num_returns=num_returns, resources=RayTaskCustomResources.get()
).remote(func, *args, **kwargs)

@classmethod
def is_future(cls, item):
Expand Down
2 changes: 2 additions & 0 deletions modin/core/execution/ray/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
IsRayCluster,
Memory,
NPartitions,
RayInitCustomResources,
RayRedisAddress,
RayRedisPassword,
StorageFormat,
Expand Down Expand Up @@ -126,6 +127,7 @@ def initialize_ray(
"object_store_memory": object_store_memory,
"_redis_password": redis_password,
"_memory": object_store_memory,
"resources": RayInitCustomResources.get(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for this case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly would you like to test with this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not test the situation when this config is different from None.

**extra_init_kw,
}
# It should be enough to simply set the required variables for the main process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pandas.io.common import get_handle, stringify_path
from ray.data import from_pandas_refs

from modin.config import RayTaskCustomResources
from modin.core.execution.ray.common import RayWrapper, SignalActor
from modin.core.execution.ray.generic.io import RayIO
from modin.core.io import (
Expand Down Expand Up @@ -188,7 +189,9 @@ def to_csv(cls, qc, **kwargs):
if not cls._to_csv_check_support(kwargs):
return RayIO.to_csv(qc, **kwargs)

signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1)
signals = SignalActor.options(resources=RayTaskCustomResources.get()).remote(
len(qc._modin_frame._partitions) + 1
)

def func(df, **kw): # pragma: no cover
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
if TYPE_CHECKING:
from ray.util.client.common import ClientObjectRef

from modin.config import LazyExecution
from modin.config import LazyExecution, RayTaskCustomResources
from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition
from modin.core.execution.ray.common import MaterializationHook, RayWrapper
from modin.core.execution.ray.common.deferred_execution import (
Expand Down Expand Up @@ -270,9 +270,9 @@ def length(self, materialize=True):
if (length := self._length_cache) is None:
self.drain_call_queue()
if (length := self._length_cache) is None:
length, self._width_cache = _get_index_and_columns.remote(
self._data_ref
)
length, self._width_cache = _get_index_and_columns.options(
resources=RayTaskCustomResources.get()
).remote(self._data_ref)
self._length_cache = length
if materialize and isinstance(length, ObjectIDType):
self._length_cache = length = RayWrapper.materialize(length)
Expand All @@ -297,9 +297,9 @@ def width(self, materialize=True):
if (width := self._width_cache) is None:
self.drain_call_queue()
if (width := self._width_cache) is None:
self._length_cache, width = _get_index_and_columns.remote(
self._data_ref
)
self._length_cache, width = _get_index_and_columns.options(
resources=RayTaskCustomResources.get()
).remote(self._data_ref)
self._width_cache = width
if materialize and isinstance(width, ObjectIDType):
self._width_cache = width = RayWrapper.materialize(width)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import ray
from ray.util import get_node_ip_address

from modin.config import RayTaskCustomResources
from modin.core.dataframe.pandas.partitioning.axis_partition import (
PandasDataframeAxisPartition,
)
Expand Down Expand Up @@ -115,6 +116,7 @@ def deploy_splitting_func(
if extract_metadata
else num_splits
),
resources=RayTaskCustomResources.get(),
).remote(
cls._get_deploy_split_func(),
*f_args,
Expand Down Expand Up @@ -180,6 +182,7 @@ def deploy_axis_func(
num_returns=(num_splits if lengths is None else len(lengths))
* (1 + cls._PARTITIONS_METADATA_LEN),
**({"max_retries": max_retries} if max_retries is not None else {}),
resources=RayTaskCustomResources.get(),
).remote(
cls._get_deploy_axis_func(),
*f_args,
Expand Down Expand Up @@ -240,7 +243,8 @@ def deploy_func_between_two_axis_partitions(
A list of ``ray.ObjectRef``-s.
"""
return _deploy_ray_func.options(
num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN)
num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN),
resources=RayTaskCustomResources.get(),
).remote(
PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions,
*f_args,
Expand Down
11 changes: 11 additions & 0 deletions modin/tests/config/test_envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from packaging import version

import modin.config as cfg
import modin.pandas as pd
from modin.config.envvars import _check_vars
from modin.config.pubsub import _UNSET, ExactStr

Expand Down Expand Up @@ -84,7 +85,7 @@
def test_doc_module():
import pandas

import modin.pandas as pd

Check notice

Code scanning / CodeQL

Module is imported more than once Note test

This import of module modin.pandas is redundant, as it was previously imported
on line 19
.
from modin.config import DocModule

DocModule.put("modin.tests.config.docs_module")
Expand All @@ -111,6 +112,16 @@
assert pandas.read_table.__doc__ in pd.read_table.__doc__


@pytest.mark.skipif(cfg.Engine.get() != "Ray", reason="Ray specific test")
def test_ray_cluster_resources():
import ray

cfg.RayInitCustomResources.put({"special_hardware": 1.0})
# create a dummy df to initialize Ray engine
_ = pd.DataFrame([1, 2, 3])
assert ray.cluster_resources()["special_hardware"] == 1.0


def test_hdk_envvar():
try:
import pyhdk
Expand Down
Loading