From aa839bdf7efca0ec02ac32e3f1f132d897805d97 Mon Sep 17 00:00:00 2001 From: "Igoshev, Iaroslav" Date: Fri, 19 Apr 2024 15:57:43 +0000 Subject: [PATCH 1/7] FEAT-#7202: Use custom resources for Ray Signed-off-by: Igoshev, Iaroslav --- modin/config/__init__.py | 2 ++ modin/config/envvars.py | 12 +++++++++++ .../ray/common/deferred_execution.py | 17 ++++++++------- .../execution/ray/common/engine_wrapper.py | 7 ++++--- modin/core/execution/ray/common/utils.py | 2 ++ .../implementations/pandas_on_ray/io/io.py | 21 +++++++++++++++---- .../pandas_on_ray/partitioning/partition.py | 14 ++++++------- .../partitioning/virtual_partition.py | 6 +++++- 8 files changed, 59 insertions(+), 22 deletions(-) diff --git a/modin/config/__init__.py b/modin/config/__init__.py index 7a05e6b01e2..d24b48618b0 100644 --- a/modin/config/__init__.py +++ b/modin/config/__init__.py @@ -47,6 +47,7 @@ ProgressBar, RangePartitioning, RangePartitioningGroupby, + RayCustomResources, RayRedisAddress, RayRedisPassword, ReadSqlEngine, @@ -75,6 +76,7 @@ "IsRayCluster", "RayRedisAddress", "RayRedisPassword", + "RayCustomResources", "LazyExecution", # Dask specific "DaskThreadsPerWorker", diff --git a/modin/config/envvars.py b/modin/config/envvars.py index 28c0bac8949..42504a9d627 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -295,6 +295,18 @@ class RayRedisPassword(EnvironmentVariable, type=ExactStr): default = secrets.token_hex(32) +class RayCustomResources(EnvironmentVariable, type=dict): + """ + Ray node's custom resources to request them in tasks or actors. + + Visit Ray documentation for more details: + https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources + """ + + varname = "MODIN_RAY_CUSTOM_RESOURCES" + default = None + + class CpuCount(EnvironmentVariable, type=int): """How many CPU cores to use during initialization of the Modin engine.""" diff --git a/modin/core/execution/ray/common/deferred_execution.py b/modin/core/execution/ray/common/deferred_execution.py index 0ace6c27b10..4763641b9cd 100644 --- a/modin/core/execution/ray/common/deferred_execution.py +++ b/modin/core/execution/ray/common/deferred_execution.py @@ -32,6 +32,7 @@ from ray._private.services import get_node_ip_address from ray.util.client.common import ClientObjectRef +from modin.config import RayCustomResources from modin.core.execution.ray.common import MaterializationHook, RayWrapper from modin.logging import get_logger @@ -155,9 +156,9 @@ def exec( and self.flat_kwargs and self.num_returns == 1 ): - result, length, width, ip = remote_exec_func.remote( - self.func, self.data, *self.args, **self.kwargs - ) + result, length, width, ip = remote_exec_func.options( + resources=RayCustomResources.get() + ).remote(self.func, self.data, *self.args, **self.kwargs) meta = MetaList([length, width, ip]) self._set_result(result, meta, 0) return result, meta, 0 @@ -435,11 +436,13 @@ def _remote_exec_chain(num_returns: int, *args: Tuple) -> List[Any]: # Prefer _remote_exec_single_chain(). It has fewer arguments and # does not require the num_returns to be specified in options. if num_returns == 2: - return _remote_exec_single_chain.remote(*args) + return _remote_exec_single_chain.options( + resources=RayCustomResources.get() + ).remote(*args) else: - return _remote_exec_multi_chain.options(num_returns=num_returns).remote( - num_returns, *args - ) + return _remote_exec_multi_chain.options( + num_returns=num_returns, resources=RayCustomResources.get() + ).remote(num_returns, *args) def _set_result( self, diff --git a/modin/core/execution/ray/common/engine_wrapper.py b/modin/core/execution/ray/common/engine_wrapper.py index fc5f8a643d2..6b44107f8a3 100644 --- a/modin/core/execution/ray/common/engine_wrapper.py +++ b/modin/core/execution/ray/common/engine_wrapper.py @@ -25,6 +25,7 @@ import ray from ray.util.client.common import ClientObjectRef +from modin.config import RayCustomResources from modin.error_message import ErrorMessage @@ -78,9 +79,9 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1): """ args = [] if f_args is None else f_args kwargs = {} if f_kwargs is None else f_kwargs - return _deploy_ray_func.options(num_returns=num_returns).remote( - func, *args, **kwargs - ) + return _deploy_ray_func.options( + num_returns=num_returns, resources=RayCustomResources.get() + ).remote(func, *args, **kwargs) @classmethod def is_future(cls, item): diff --git a/modin/core/execution/ray/common/utils.py b/modin/core/execution/ray/common/utils.py index 346ff8fcda3..834be5a9619 100644 --- a/modin/core/execution/ray/common/utils.py +++ b/modin/core/execution/ray/common/utils.py @@ -31,6 +31,7 @@ IsRayCluster, Memory, NPartitions, + RayCustomResources, RayRedisAddress, RayRedisPassword, StorageFormat, @@ -126,6 +127,7 @@ def initialize_ray( "object_store_memory": object_store_memory, "_redis_password": redis_password, "_memory": object_store_memory, + "resources": RayCustomResources.get(), **extra_init_kw, } # It should be enough to simply set the required variables for the main process diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 94a3c9120bf..5f8762ee425 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -19,6 +19,7 @@ from pandas.io.common import get_handle, stringify_path from ray.data import from_pandas_refs +from modin.config import RayCustomResources from modin.core.execution.ray.common import RayWrapper, SignalActor from modin.core.execution.ray.generic.io import RayIO from modin.core.io import ( @@ -188,7 +189,9 @@ def to_csv(cls, qc, **kwargs): if not cls._to_csv_check_support(kwargs): return RayIO.to_csv(qc, **kwargs) - signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1) + signals = SignalActor.options(resources=RayCustomResources.get()).remote( + len(qc._modin_frame._partitions) + 1 + ) def func(df, **kw): # pragma: no cover """ @@ -225,7 +228,11 @@ def func(df, **kw): # pragma: no cover csv_kwargs["path_or_buf"].close() # each process waits for its turn to write to a file - RayWrapper.materialize(signals.wait.remote(partition_idx)) + RayWrapper.materialize( + signals.wait.options(resources=RayCustomResources.get()).remote( + partition_idx + ) + ) # preparing to write data from the buffer to a file with get_handle( @@ -242,12 +249,18 @@ def func(df, **kw): # pragma: no cover handles.handle.write(content) # signal that the next process can start writing to the file - RayWrapper.materialize(signals.send.remote(partition_idx + 1)) + RayWrapper.materialize( + signals.send.options(resources=RayCustomResources.get()).remote( + partition_idx + 1 + ) + ) # used for synchronization purposes return pandas.DataFrame() # signaling that the partition with id==0 can be written to the file - RayWrapper.materialize(signals.send.remote(0)) + RayWrapper.materialize( + signals.send.options(resources=RayCustomResources.get()).remote(0) + ) # Ensure that the metadata is syncrhonized qc._modin_frame._propagate_index_objs(axis=None) result = qc._modin_frame._partition_mgr_cls.map_axis_partitions( diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index a4c35bf7e95..2072ac957cb 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -21,7 +21,7 @@ if TYPE_CHECKING: from ray.util.client.common import ClientObjectRef -from modin.config import LazyExecution +from modin.config import LazyExecution, RayCustomResources from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition from modin.core.execution.ray.common import MaterializationHook, RayWrapper from modin.core.execution.ray.common.deferred_execution import ( @@ -270,9 +270,9 @@ def length(self, materialize=True): if (length := self._length_cache) is None: self.drain_call_queue() if (length := self._length_cache) is None: - length, self._width_cache = _get_index_and_columns.remote( - self._data_ref - ) + length, self._width_cache = _get_index_and_columns.options( + resources=RayCustomResources.get() + ).remote(self._data_ref) self._length_cache = length if materialize and isinstance(length, ObjectIDType): self._length_cache = length = RayWrapper.materialize(length) @@ -297,9 +297,9 @@ def width(self, materialize=True): if (width := self._width_cache) is None: self.drain_call_queue() if (width := self._width_cache) is None: - self._length_cache, width = _get_index_and_columns.remote( - self._data_ref - ) + self._length_cache, width = _get_index_and_columns.options( + resources=RayCustomResources.get() + ).remote(self._data_ref) self._width_cache = width if materialize and isinstance(width, ObjectIDType): self._width_cache = width = RayWrapper.materialize(width) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index 91499e2ee8c..1feece8997b 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -17,6 +17,7 @@ import ray from ray.util import get_node_ip_address +from modin.config import RayCustomResources from modin.core.dataframe.pandas.partitioning.axis_partition import ( PandasDataframeAxisPartition, ) @@ -115,6 +116,7 @@ def deploy_splitting_func( if extract_metadata else num_splits ), + resources=RayCustomResources.get(), ).remote( cls._get_deploy_split_func(), *f_args, @@ -180,6 +182,7 @@ def deploy_axis_func( num_returns=(num_splits if lengths is None else len(lengths)) * (1 + cls._PARTITIONS_METADATA_LEN), **({"max_retries": max_retries} if max_retries is not None else {}), + resources=RayCustomResources.get(), ).remote( cls._get_deploy_axis_func(), *f_args, @@ -240,7 +243,8 @@ def deploy_func_between_two_axis_partitions( A list of ``ray.ObjectRef``-s. """ return _deploy_ray_func.options( - num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN) + num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN), + resources=RayCustomResources.get(), ).remote( PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions, *f_args, From c3dc911cf7e17d0f45fb68f6cc553247521129dd Mon Sep 17 00:00:00 2001 From: "Igoshev, Iaroslav" Date: Fri, 19 Apr 2024 16:08:04 +0000 Subject: [PATCH 2/7] Fix actors Signed-off-by: Igoshev, Iaroslav --- .../ray/implementations/pandas_on_ray/io/io.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 5f8762ee425..9d81930de21 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -228,11 +228,7 @@ def func(df, **kw): # pragma: no cover csv_kwargs["path_or_buf"].close() # each process waits for its turn to write to a file - RayWrapper.materialize( - signals.wait.options(resources=RayCustomResources.get()).remote( - partition_idx - ) - ) + RayWrapper.materialize(signals.wait.remote(partition_idx)) # preparing to write data from the buffer to a file with get_handle( @@ -249,18 +245,12 @@ def func(df, **kw): # pragma: no cover handles.handle.write(content) # signal that the next process can start writing to the file - RayWrapper.materialize( - signals.send.options(resources=RayCustomResources.get()).remote( - partition_idx + 1 - ) - ) + RayWrapper.materialize(signals.send.remote(partition_idx + 1)) # used for synchronization purposes return pandas.DataFrame() # signaling that the partition with id==0 can be written to the file - RayWrapper.materialize( - signals.send.options(resources=RayCustomResources.get()).remote(0) - ) + RayWrapper.materialize(signals.send.remote(0)) # Ensure that the metadata is syncrhonized qc._modin_frame._propagate_index_objs(axis=None) result = qc._modin_frame._partition_mgr_cls.map_axis_partitions( From 3b9e66545a237ec9a55fea8aad2a20d954dfc96f Mon Sep 17 00:00:00 2001 From: "Igoshev, Iaroslav" Date: Mon, 22 Apr 2024 14:13:24 +0000 Subject: [PATCH 3/7] Address comments Signed-off-by: Igoshev, Iaroslav --- modin/config/__init__.py | 6 ++-- modin/config/envvars.py | 35 +++++++++++++++++-- .../ray/common/deferred_execution.py | 8 ++--- .../execution/ray/common/engine_wrapper.py | 4 +-- modin/core/execution/ray/common/utils.py | 4 +-- .../implementations/pandas_on_ray/io/io.py | 4 +-- .../pandas_on_ray/partitioning/partition.py | 6 ++-- .../partitioning/virtual_partition.py | 8 ++--- 8 files changed, 54 insertions(+), 21 deletions(-) diff --git a/modin/config/__init__.py b/modin/config/__init__.py index d24b48618b0..ee917be3e5a 100644 --- a/modin/config/__init__.py +++ b/modin/config/__init__.py @@ -47,7 +47,8 @@ ProgressBar, RangePartitioning, RangePartitioningGroupby, - RayCustomResources, + RayInitCustomResources, + RayTaskCustomResources, RayRedisAddress, RayRedisPassword, ReadSqlEngine, @@ -76,7 +77,8 @@ "IsRayCluster", "RayRedisAddress", "RayRedisPassword", - "RayCustomResources", + "RayInitCustomResources", + "RayTaskCustomResources", "LazyExecution", # Dask specific "DaskThreadsPerWorker", diff --git a/modin/config/envvars.py b/modin/config/envvars.py index 42504a9d627..9516a2e9ce0 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -295,15 +295,46 @@ class RayRedisPassword(EnvironmentVariable, type=ExactStr): default = secrets.token_hex(32) -class RayCustomResources(EnvironmentVariable, type=dict): +class RayInitCustomResources(EnvironmentVariable, type=dict): + """ + Ray node's custom resources to initialize with. + + Visit Ray documentation for more details: + https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources + + Notes + ----- + Relying on Modin to initialize Ray, you should set this config + for the proper initialization with custom resources. + """ + + varname = "MODIN_RAY_INIT_CUSTOM_RESOURCES" + default = None + + +class RayTaskCustomResources(EnvironmentVariable, type=dict): """ Ray node's custom resources to request them in tasks or actors. Visit Ray documentation for more details: https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources + + Notes + ----- + You can use this config to limit the parallelism for the entire workflow + by setting the config at the very beginning. + >>> import modin.config as cfg + >>> cfg.RayTaskCustomResources.put({"special_hardware": 0.001}) + This way each single remote task or actor will require 0.001 of "special_hardware" to run. + You can also use this config to limit the parallelism for a certain operation + by setting the config with context. + >>> with context(RayTaskCustomResources={"special_hardware": 1.0}): + ... df. + This way each single remote task or actor will require 0.001 of "special_hardware" to run + within the context only. """ - varname = "MODIN_RAY_CUSTOM_RESOURCES" + varname = "MODIN_RAY_TASK_CUSTOM_RESOURCES" default = None diff --git a/modin/core/execution/ray/common/deferred_execution.py b/modin/core/execution/ray/common/deferred_execution.py index 4763641b9cd..2d296156bea 100644 --- a/modin/core/execution/ray/common/deferred_execution.py +++ b/modin/core/execution/ray/common/deferred_execution.py @@ -32,7 +32,7 @@ from ray._private.services import get_node_ip_address from ray.util.client.common import ClientObjectRef -from modin.config import RayCustomResources +from modin.config import RayTaskCustomResources from modin.core.execution.ray.common import MaterializationHook, RayWrapper from modin.logging import get_logger @@ -157,7 +157,7 @@ def exec( and self.num_returns == 1 ): result, length, width, ip = remote_exec_func.options( - resources=RayCustomResources.get() + resources=RayTaskCustomResources.get() ).remote(self.func, self.data, *self.args, **self.kwargs) meta = MetaList([length, width, ip]) self._set_result(result, meta, 0) @@ -437,11 +437,11 @@ def _remote_exec_chain(num_returns: int, *args: Tuple) -> List[Any]: # does not require the num_returns to be specified in options. if num_returns == 2: return _remote_exec_single_chain.options( - resources=RayCustomResources.get() + resources=RayTaskCustomResources.get() ).remote(*args) else: return _remote_exec_multi_chain.options( - num_returns=num_returns, resources=RayCustomResources.get() + num_returns=num_returns, resources=RayTaskCustomResources.get() ).remote(num_returns, *args) def _set_result( diff --git a/modin/core/execution/ray/common/engine_wrapper.py b/modin/core/execution/ray/common/engine_wrapper.py index 6b44107f8a3..930941de701 100644 --- a/modin/core/execution/ray/common/engine_wrapper.py +++ b/modin/core/execution/ray/common/engine_wrapper.py @@ -25,7 +25,7 @@ import ray from ray.util.client.common import ClientObjectRef -from modin.config import RayCustomResources +from modin.config import RayTaskCustomResources from modin.error_message import ErrorMessage @@ -80,7 +80,7 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1): args = [] if f_args is None else f_args kwargs = {} if f_kwargs is None else f_kwargs return _deploy_ray_func.options( - num_returns=num_returns, resources=RayCustomResources.get() + num_returns=num_returns, resources=RayTaskCustomResources.get() ).remote(func, *args, **kwargs) @classmethod diff --git a/modin/core/execution/ray/common/utils.py b/modin/core/execution/ray/common/utils.py index 834be5a9619..85d09e17850 100644 --- a/modin/core/execution/ray/common/utils.py +++ b/modin/core/execution/ray/common/utils.py @@ -31,7 +31,7 @@ IsRayCluster, Memory, NPartitions, - RayCustomResources, + RayInitCustomResources, RayRedisAddress, RayRedisPassword, StorageFormat, @@ -127,7 +127,7 @@ def initialize_ray( "object_store_memory": object_store_memory, "_redis_password": redis_password, "_memory": object_store_memory, - "resources": RayCustomResources.get(), + "resources": RayInitCustomResources.get(), **extra_init_kw, } # It should be enough to simply set the required variables for the main process diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 9d81930de21..ec88dda60d9 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -19,7 +19,7 @@ from pandas.io.common import get_handle, stringify_path from ray.data import from_pandas_refs -from modin.config import RayCustomResources +from modin.config import RayTaskCustomResources from modin.core.execution.ray.common import RayWrapper, SignalActor from modin.core.execution.ray.generic.io import RayIO from modin.core.io import ( @@ -189,7 +189,7 @@ def to_csv(cls, qc, **kwargs): if not cls._to_csv_check_support(kwargs): return RayIO.to_csv(qc, **kwargs) - signals = SignalActor.options(resources=RayCustomResources.get()).remote( + signals = SignalActor.options(resources=RayTaskCustomResources.get()).remote( len(qc._modin_frame._partitions) + 1 ) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index 2072ac957cb..4d42c3b3bbd 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -21,7 +21,7 @@ if TYPE_CHECKING: from ray.util.client.common import ClientObjectRef -from modin.config import LazyExecution, RayCustomResources +from modin.config import LazyExecution, RayTaskCustomResources from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition from modin.core.execution.ray.common import MaterializationHook, RayWrapper from modin.core.execution.ray.common.deferred_execution import ( @@ -271,7 +271,7 @@ def length(self, materialize=True): self.drain_call_queue() if (length := self._length_cache) is None: length, self._width_cache = _get_index_and_columns.options( - resources=RayCustomResources.get() + resources=RayTaskCustomResources.get() ).remote(self._data_ref) self._length_cache = length if materialize and isinstance(length, ObjectIDType): @@ -298,7 +298,7 @@ def width(self, materialize=True): self.drain_call_queue() if (width := self._width_cache) is None: self._length_cache, width = _get_index_and_columns.options( - resources=RayCustomResources.get() + resources=RayTaskCustomResources.get() ).remote(self._data_ref) self._width_cache = width if materialize and isinstance(width, ObjectIDType): diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index 1feece8997b..62903330ab9 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -17,7 +17,7 @@ import ray from ray.util import get_node_ip_address -from modin.config import RayCustomResources +from modin.config import RayTaskCustomResources from modin.core.dataframe.pandas.partitioning.axis_partition import ( PandasDataframeAxisPartition, ) @@ -116,7 +116,7 @@ def deploy_splitting_func( if extract_metadata else num_splits ), - resources=RayCustomResources.get(), + resources=RayTaskCustomResources.get(), ).remote( cls._get_deploy_split_func(), *f_args, @@ -182,7 +182,7 @@ def deploy_axis_func( num_returns=(num_splits if lengths is None else len(lengths)) * (1 + cls._PARTITIONS_METADATA_LEN), **({"max_retries": max_retries} if max_retries is not None else {}), - resources=RayCustomResources.get(), + resources=RayTaskCustomResources.get(), ).remote( cls._get_deploy_axis_func(), *f_args, @@ -244,7 +244,7 @@ def deploy_func_between_two_axis_partitions( """ return _deploy_ray_func.options( num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN), - resources=RayCustomResources.get(), + resources=RayTaskCustomResources.get(), ).remote( PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions, *f_args, From 6bb9847bd5c6c407039309fac83a314fc5c36089 Mon Sep 17 00:00:00 2001 From: "Igoshev, Iaroslav" Date: Mon, 22 Apr 2024 15:32:57 +0000 Subject: [PATCH 4/7] Fix isort Signed-off-by: Igoshev, Iaroslav --- modin/config/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/config/__init__.py b/modin/config/__init__.py index ee917be3e5a..d2f590549c3 100644 --- a/modin/config/__init__.py +++ b/modin/config/__init__.py @@ -48,9 +48,9 @@ RangePartitioning, RangePartitioningGroupby, RayInitCustomResources, - RayTaskCustomResources, RayRedisAddress, RayRedisPassword, + RayTaskCustomResources, ReadSqlEngine, StorageFormat, TestDatasetSize, From c763161a28778d5b209ca89c409447766e8ca3c1 Mon Sep 17 00:00:00 2001 From: Iaroslav Igoshev Date: Mon, 22 Apr 2024 17:34:39 +0200 Subject: [PATCH 5/7] Update modin/config/envvars.py --- modin/config/envvars.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/config/envvars.py b/modin/config/envvars.py index 9516a2e9ce0..d18a0a78924 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -328,7 +328,7 @@ class RayTaskCustomResources(EnvironmentVariable, type=dict): This way each single remote task or actor will require 0.001 of "special_hardware" to run. You can also use this config to limit the parallelism for a certain operation by setting the config with context. - >>> with context(RayTaskCustomResources={"special_hardware": 1.0}): + >>> with context(RayTaskCustomResources={"special_hardware": 0.001}): ... df. This way each single remote task or actor will require 0.001 of "special_hardware" to run within the context only. From 36f5d7b0cf83c83a7b3c8b0cb999d682514bc593 Mon Sep 17 00:00:00 2001 From: "Igoshev, Iaroslav" Date: Mon, 22 Apr 2024 20:25:02 +0000 Subject: [PATCH 6/7] Add a test Signed-off-by: Igoshev, Iaroslav --- modin/tests/config/test_envvars.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/modin/tests/config/test_envvars.py b/modin/tests/config/test_envvars.py index df1e5ef58e3..debc5514853 100644 --- a/modin/tests/config/test_envvars.py +++ b/modin/tests/config/test_envvars.py @@ -18,6 +18,7 @@ import pytest from packaging import version +import modin.pandas as pd import modin.config as cfg from modin.config.envvars import _check_vars from modin.config.pubsub import _UNSET, ExactStr @@ -111,6 +112,16 @@ def test_doc_module(): assert pandas.read_table.__doc__ in pd.read_table.__doc__ +@pytest.mark.skipif(cfg.Engine.get() != "Ray", reason="Ray specific test") +def test_ray_cluster_resources(): + import ray + + cfg.RayInitCustomResources.put({"special_hardware": 1.0}) + # create a dummy df to initialize Ray engine + _ = pd.DataFrame([1, 2, 3]) + assert ray.cluster_resources()["special_hardware"] == 1.0 + + def test_hdk_envvar(): try: import pyhdk From ee077b3a01bd4dec3a37c79d121fd8b8f8cc0b0f Mon Sep 17 00:00:00 2001 From: "Igoshev, Iaroslav" Date: Mon, 22 Apr 2024 20:26:26 +0000 Subject: [PATCH 7/7] Fix isort Signed-off-by: Igoshev, Iaroslav --- modin/tests/config/test_envvars.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/tests/config/test_envvars.py b/modin/tests/config/test_envvars.py index debc5514853..34110e62014 100644 --- a/modin/tests/config/test_envvars.py +++ b/modin/tests/config/test_envvars.py @@ -18,8 +18,8 @@ import pytest from packaging import version -import modin.pandas as pd import modin.config as cfg +import modin.pandas as pd from modin.config.envvars import _check_vars from modin.config.pubsub import _UNSET, ExactStr