Skip to content

Commit

Permalink
FEAT-#7202: Use custom resources for Ray (#7205)
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Iaroslav <[email protected]>
  • Loading branch information
YarShev authored Apr 23, 2024
1 parent 3abd961 commit 71b8da4
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 19 deletions.
4 changes: 4 additions & 0 deletions modin/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@
ProgressBar,
RangePartitioning,
RangePartitioningGroupby,
RayInitCustomResources,
RayRedisAddress,
RayRedisPassword,
RayTaskCustomResources,
ReadSqlEngine,
StorageFormat,
TestDatasetSize,
Expand All @@ -75,6 +77,8 @@
"IsRayCluster",
"RayRedisAddress",
"RayRedisPassword",
"RayInitCustomResources",
"RayTaskCustomResources",
"LazyExecution",
# Dask specific
"DaskThreadsPerWorker",
Expand Down
43 changes: 43 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,49 @@ class RayRedisPassword(EnvironmentVariable, type=ExactStr):
default = secrets.token_hex(32)


class RayInitCustomResources(EnvironmentVariable, type=dict):
"""
Ray node's custom resources to initialize with.
Visit Ray documentation for more details:
https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources
Notes
-----
Relying on Modin to initialize Ray, you should set this config
for the proper initialization with custom resources.
"""

varname = "MODIN_RAY_INIT_CUSTOM_RESOURCES"
default = None


class RayTaskCustomResources(EnvironmentVariable, type=dict):
"""
Ray node's custom resources to request them in tasks or actors.
Visit Ray documentation for more details:
https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources
Notes
-----
You can use this config to limit the parallelism for the entire workflow
by setting the config at the very beginning.
>>> import modin.config as cfg
>>> cfg.RayTaskCustomResources.put({"special_hardware": 0.001})
This way each single remote task or actor will require 0.001 of "special_hardware" to run.
You can also use this config to limit the parallelism for a certain operation
by setting the config with context.
>>> with context(RayTaskCustomResources={"special_hardware": 0.001}):
... df.<op>
This way each single remote task or actor will require 0.001 of "special_hardware" to run
within the context only.
"""

varname = "MODIN_RAY_TASK_CUSTOM_RESOURCES"
default = None


class CpuCount(EnvironmentVariable, type=int):
"""How many CPU cores to use during initialization of the Modin engine."""

Expand Down
17 changes: 10 additions & 7 deletions modin/core/execution/ray/common/deferred_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from ray._private.services import get_node_ip_address
from ray.util.client.common import ClientObjectRef

from modin.config import RayTaskCustomResources
from modin.core.execution.ray.common import MaterializationHook, RayWrapper
from modin.logging import get_logger

Expand Down Expand Up @@ -155,9 +156,9 @@ def exec(
and self.flat_kwargs
and self.num_returns == 1
):
result, length, width, ip = remote_exec_func.remote(
self.func, self.data, *self.args, **self.kwargs
)
result, length, width, ip = remote_exec_func.options(
resources=RayTaskCustomResources.get()
).remote(self.func, self.data, *self.args, **self.kwargs)
meta = MetaList([length, width, ip])
self._set_result(result, meta, 0)
return result, meta, 0
Expand Down Expand Up @@ -435,11 +436,13 @@ def _remote_exec_chain(num_returns: int, *args: Tuple) -> List[Any]:
# Prefer _remote_exec_single_chain(). It has fewer arguments and
# does not require the num_returns to be specified in options.
if num_returns == 2:
return _remote_exec_single_chain.remote(*args)
return _remote_exec_single_chain.options(
resources=RayTaskCustomResources.get()
).remote(*args)
else:
return _remote_exec_multi_chain.options(num_returns=num_returns).remote(
num_returns, *args
)
return _remote_exec_multi_chain.options(
num_returns=num_returns, resources=RayTaskCustomResources.get()
).remote(num_returns, *args)

def _set_result(
self,
Expand Down
7 changes: 4 additions & 3 deletions modin/core/execution/ray/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import ray
from ray.util.client.common import ClientObjectRef

from modin.config import RayTaskCustomResources
from modin.error_message import ErrorMessage


Expand Down Expand Up @@ -78,9 +79,9 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
"""
args = [] if f_args is None else f_args
kwargs = {} if f_kwargs is None else f_kwargs
return _deploy_ray_func.options(num_returns=num_returns).remote(
func, *args, **kwargs
)
return _deploy_ray_func.options(
num_returns=num_returns, resources=RayTaskCustomResources.get()
).remote(func, *args, **kwargs)

@classmethod
def is_future(cls, item):
Expand Down
2 changes: 2 additions & 0 deletions modin/core/execution/ray/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
IsRayCluster,
Memory,
NPartitions,
RayInitCustomResources,
RayRedisAddress,
RayRedisPassword,
StorageFormat,
Expand Down Expand Up @@ -126,6 +127,7 @@ def initialize_ray(
"object_store_memory": object_store_memory,
"_redis_password": redis_password,
"_memory": object_store_memory,
"resources": RayInitCustomResources.get(),
**extra_init_kw,
}
# It should be enough to simply set the required variables for the main process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pandas.io.common import get_handle, stringify_path
from ray.data import from_pandas_refs

from modin.config import RayTaskCustomResources
from modin.core.execution.ray.common import RayWrapper, SignalActor
from modin.core.execution.ray.generic.io import RayIO
from modin.core.io import (
Expand Down Expand Up @@ -188,7 +189,9 @@ def to_csv(cls, qc, **kwargs):
if not cls._to_csv_check_support(kwargs):
return RayIO.to_csv(qc, **kwargs)

signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1)
signals = SignalActor.options(resources=RayTaskCustomResources.get()).remote(
len(qc._modin_frame._partitions) + 1
)

def func(df, **kw): # pragma: no cover
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
if TYPE_CHECKING:
from ray.util.client.common import ClientObjectRef

from modin.config import LazyExecution
from modin.config import LazyExecution, RayTaskCustomResources
from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition
from modin.core.execution.ray.common import MaterializationHook, RayWrapper
from modin.core.execution.ray.common.deferred_execution import (
Expand Down Expand Up @@ -270,9 +270,9 @@ def length(self, materialize=True):
if (length := self._length_cache) is None:
self.drain_call_queue()
if (length := self._length_cache) is None:
length, self._width_cache = _get_index_and_columns.remote(
self._data_ref
)
length, self._width_cache = _get_index_and_columns.options(
resources=RayTaskCustomResources.get()
).remote(self._data_ref)
self._length_cache = length
if materialize and isinstance(length, ObjectIDType):
self._length_cache = length = RayWrapper.materialize(length)
Expand All @@ -297,9 +297,9 @@ def width(self, materialize=True):
if (width := self._width_cache) is None:
self.drain_call_queue()
if (width := self._width_cache) is None:
self._length_cache, width = _get_index_and_columns.remote(
self._data_ref
)
self._length_cache, width = _get_index_and_columns.options(
resources=RayTaskCustomResources.get()
).remote(self._data_ref)
self._width_cache = width
if materialize and isinstance(width, ObjectIDType):
self._width_cache = width = RayWrapper.materialize(width)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import ray
from ray.util import get_node_ip_address

from modin.config import RayTaskCustomResources
from modin.core.dataframe.pandas.partitioning.axis_partition import (
PandasDataframeAxisPartition,
)
Expand Down Expand Up @@ -115,6 +116,7 @@ def deploy_splitting_func(
if extract_metadata
else num_splits
),
resources=RayTaskCustomResources.get(),
).remote(
cls._get_deploy_split_func(),
*f_args,
Expand Down Expand Up @@ -180,6 +182,7 @@ def deploy_axis_func(
num_returns=(num_splits if lengths is None else len(lengths))
* (1 + cls._PARTITIONS_METADATA_LEN),
**({"max_retries": max_retries} if max_retries is not None else {}),
resources=RayTaskCustomResources.get(),
).remote(
cls._get_deploy_axis_func(),
*f_args,
Expand Down Expand Up @@ -240,7 +243,8 @@ def deploy_func_between_two_axis_partitions(
A list of ``ray.ObjectRef``-s.
"""
return _deploy_ray_func.options(
num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN)
num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN),
resources=RayTaskCustomResources.get(),
).remote(
PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions,
*f_args,
Expand Down
11 changes: 11 additions & 0 deletions modin/tests/config/test_envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from packaging import version

import modin.config as cfg
import modin.pandas as pd
from modin.config.envvars import _check_vars
from modin.config.pubsub import _UNSET, ExactStr

Expand Down Expand Up @@ -111,6 +112,16 @@ def test_doc_module():
assert pandas.read_table.__doc__ in pd.read_table.__doc__


@pytest.mark.skipif(cfg.Engine.get() != "Ray", reason="Ray specific test")
def test_ray_cluster_resources():
import ray

cfg.RayInitCustomResources.put({"special_hardware": 1.0})
# create a dummy df to initialize Ray engine
_ = pd.DataFrame([1, 2, 3])
assert ray.cluster_resources()["special_hardware"] == 1.0


def test_hdk_envvar():
try:
import pyhdk
Expand Down

0 comments on commit 71b8da4

Please sign in to comment.