diff --git a/modin/config/__init__.py b/modin/config/__init__.py index 7a05e6b01e2..d2f590549c3 100644 --- a/modin/config/__init__.py +++ b/modin/config/__init__.py @@ -47,8 +47,10 @@ ProgressBar, RangePartitioning, RangePartitioningGroupby, + RayInitCustomResources, RayRedisAddress, RayRedisPassword, + RayTaskCustomResources, ReadSqlEngine, StorageFormat, TestDatasetSize, @@ -75,6 +77,8 @@ "IsRayCluster", "RayRedisAddress", "RayRedisPassword", + "RayInitCustomResources", + "RayTaskCustomResources", "LazyExecution", # Dask specific "DaskThreadsPerWorker", diff --git a/modin/config/envvars.py b/modin/config/envvars.py index 28c0bac8949..d18a0a78924 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -295,6 +295,49 @@ class RayRedisPassword(EnvironmentVariable, type=ExactStr): default = secrets.token_hex(32) +class RayInitCustomResources(EnvironmentVariable, type=dict): + """ + Ray node's custom resources to initialize with. + + Visit Ray documentation for more details: + https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources + + Notes + ----- + Relying on Modin to initialize Ray, you should set this config + for the proper initialization with custom resources. + """ + + varname = "MODIN_RAY_INIT_CUSTOM_RESOURCES" + default = None + + +class RayTaskCustomResources(EnvironmentVariable, type=dict): + """ + Ray node's custom resources to request them in tasks or actors. + + Visit Ray documentation for more details: + https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources + + Notes + ----- + You can use this config to limit the parallelism for the entire workflow + by setting the config at the very beginning. + >>> import modin.config as cfg + >>> cfg.RayTaskCustomResources.put({"special_hardware": 0.001}) + This way each single remote task or actor will require 0.001 of "special_hardware" to run. + You can also use this config to limit the parallelism for a certain operation + by setting the config with context. + >>> with context(RayTaskCustomResources={"special_hardware": 0.001}): + ... df. + This way each single remote task or actor will require 0.001 of "special_hardware" to run + within the context only. + """ + + varname = "MODIN_RAY_TASK_CUSTOM_RESOURCES" + default = None + + class CpuCount(EnvironmentVariable, type=int): """How many CPU cores to use during initialization of the Modin engine.""" diff --git a/modin/core/execution/ray/common/deferred_execution.py b/modin/core/execution/ray/common/deferred_execution.py index 0ace6c27b10..2d296156bea 100644 --- a/modin/core/execution/ray/common/deferred_execution.py +++ b/modin/core/execution/ray/common/deferred_execution.py @@ -32,6 +32,7 @@ from ray._private.services import get_node_ip_address from ray.util.client.common import ClientObjectRef +from modin.config import RayTaskCustomResources from modin.core.execution.ray.common import MaterializationHook, RayWrapper from modin.logging import get_logger @@ -155,9 +156,9 @@ def exec( and self.flat_kwargs and self.num_returns == 1 ): - result, length, width, ip = remote_exec_func.remote( - self.func, self.data, *self.args, **self.kwargs - ) + result, length, width, ip = remote_exec_func.options( + resources=RayTaskCustomResources.get() + ).remote(self.func, self.data, *self.args, **self.kwargs) meta = MetaList([length, width, ip]) self._set_result(result, meta, 0) return result, meta, 0 @@ -435,11 +436,13 @@ def _remote_exec_chain(num_returns: int, *args: Tuple) -> List[Any]: # Prefer _remote_exec_single_chain(). It has fewer arguments and # does not require the num_returns to be specified in options. if num_returns == 2: - return _remote_exec_single_chain.remote(*args) + return _remote_exec_single_chain.options( + resources=RayTaskCustomResources.get() + ).remote(*args) else: - return _remote_exec_multi_chain.options(num_returns=num_returns).remote( - num_returns, *args - ) + return _remote_exec_multi_chain.options( + num_returns=num_returns, resources=RayTaskCustomResources.get() + ).remote(num_returns, *args) def _set_result( self, diff --git a/modin/core/execution/ray/common/engine_wrapper.py b/modin/core/execution/ray/common/engine_wrapper.py index fc5f8a643d2..930941de701 100644 --- a/modin/core/execution/ray/common/engine_wrapper.py +++ b/modin/core/execution/ray/common/engine_wrapper.py @@ -25,6 +25,7 @@ import ray from ray.util.client.common import ClientObjectRef +from modin.config import RayTaskCustomResources from modin.error_message import ErrorMessage @@ -78,9 +79,9 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1): """ args = [] if f_args is None else f_args kwargs = {} if f_kwargs is None else f_kwargs - return _deploy_ray_func.options(num_returns=num_returns).remote( - func, *args, **kwargs - ) + return _deploy_ray_func.options( + num_returns=num_returns, resources=RayTaskCustomResources.get() + ).remote(func, *args, **kwargs) @classmethod def is_future(cls, item): diff --git a/modin/core/execution/ray/common/utils.py b/modin/core/execution/ray/common/utils.py index 346ff8fcda3..85d09e17850 100644 --- a/modin/core/execution/ray/common/utils.py +++ b/modin/core/execution/ray/common/utils.py @@ -31,6 +31,7 @@ IsRayCluster, Memory, NPartitions, + RayInitCustomResources, RayRedisAddress, RayRedisPassword, StorageFormat, @@ -126,6 +127,7 @@ def initialize_ray( "object_store_memory": object_store_memory, "_redis_password": redis_password, "_memory": object_store_memory, + "resources": RayInitCustomResources.get(), **extra_init_kw, } # It should be enough to simply set the required variables for the main process diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 94a3c9120bf..ec88dda60d9 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -19,6 +19,7 @@ from pandas.io.common import get_handle, stringify_path from ray.data import from_pandas_refs +from modin.config import RayTaskCustomResources from modin.core.execution.ray.common import RayWrapper, SignalActor from modin.core.execution.ray.generic.io import RayIO from modin.core.io import ( @@ -188,7 +189,9 @@ def to_csv(cls, qc, **kwargs): if not cls._to_csv_check_support(kwargs): return RayIO.to_csv(qc, **kwargs) - signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1) + signals = SignalActor.options(resources=RayTaskCustomResources.get()).remote( + len(qc._modin_frame._partitions) + 1 + ) def func(df, **kw): # pragma: no cover """ diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index a4c35bf7e95..4d42c3b3bbd 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -21,7 +21,7 @@ if TYPE_CHECKING: from ray.util.client.common import ClientObjectRef -from modin.config import LazyExecution +from modin.config import LazyExecution, RayTaskCustomResources from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition from modin.core.execution.ray.common import MaterializationHook, RayWrapper from modin.core.execution.ray.common.deferred_execution import ( @@ -270,9 +270,9 @@ def length(self, materialize=True): if (length := self._length_cache) is None: self.drain_call_queue() if (length := self._length_cache) is None: - length, self._width_cache = _get_index_and_columns.remote( - self._data_ref - ) + length, self._width_cache = _get_index_and_columns.options( + resources=RayTaskCustomResources.get() + ).remote(self._data_ref) self._length_cache = length if materialize and isinstance(length, ObjectIDType): self._length_cache = length = RayWrapper.materialize(length) @@ -297,9 +297,9 @@ def width(self, materialize=True): if (width := self._width_cache) is None: self.drain_call_queue() if (width := self._width_cache) is None: - self._length_cache, width = _get_index_and_columns.remote( - self._data_ref - ) + self._length_cache, width = _get_index_and_columns.options( + resources=RayTaskCustomResources.get() + ).remote(self._data_ref) self._width_cache = width if materialize and isinstance(width, ObjectIDType): self._width_cache = width = RayWrapper.materialize(width) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index 91499e2ee8c..62903330ab9 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -17,6 +17,7 @@ import ray from ray.util import get_node_ip_address +from modin.config import RayTaskCustomResources from modin.core.dataframe.pandas.partitioning.axis_partition import ( PandasDataframeAxisPartition, ) @@ -115,6 +116,7 @@ def deploy_splitting_func( if extract_metadata else num_splits ), + resources=RayTaskCustomResources.get(), ).remote( cls._get_deploy_split_func(), *f_args, @@ -180,6 +182,7 @@ def deploy_axis_func( num_returns=(num_splits if lengths is None else len(lengths)) * (1 + cls._PARTITIONS_METADATA_LEN), **({"max_retries": max_retries} if max_retries is not None else {}), + resources=RayTaskCustomResources.get(), ).remote( cls._get_deploy_axis_func(), *f_args, @@ -240,7 +243,8 @@ def deploy_func_between_two_axis_partitions( A list of ``ray.ObjectRef``-s. """ return _deploy_ray_func.options( - num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN) + num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN), + resources=RayTaskCustomResources.get(), ).remote( PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions, *f_args, diff --git a/modin/tests/config/test_envvars.py b/modin/tests/config/test_envvars.py index df1e5ef58e3..34110e62014 100644 --- a/modin/tests/config/test_envvars.py +++ b/modin/tests/config/test_envvars.py @@ -19,6 +19,7 @@ from packaging import version import modin.config as cfg +import modin.pandas as pd from modin.config.envvars import _check_vars from modin.config.pubsub import _UNSET, ExactStr @@ -111,6 +112,16 @@ def test_doc_module(): assert pandas.read_table.__doc__ in pd.read_table.__doc__ +@pytest.mark.skipif(cfg.Engine.get() != "Ray", reason="Ray specific test") +def test_ray_cluster_resources(): + import ray + + cfg.RayInitCustomResources.put({"special_hardware": 1.0}) + # create a dummy df to initialize Ray engine + _ = pd.DataFrame([1, 2, 3]) + assert ray.cluster_resources()["special_hardware"] == 1.0 + + def test_hdk_envvar(): try: import pyhdk