diff --git a/docs/usage_guide/optimization_notes/index.rst b/docs/usage_guide/optimization_notes/index.rst index aadd813e318..e805917554c 100644 --- a/docs/usage_guide/optimization_notes/index.rst +++ b/docs/usage_guide/optimization_notes/index.rst @@ -37,6 +37,34 @@ Range-partitioning is not a silver bullet, meaning that enabling it is not alway a link to the list of operations that have support for range-partitioning and practical advices on when one should enable it: :doc:`operations that support range-partitioning `. +Dynamic-partitioning in Modin +""""""""""""""""""""""""""""" + +Ray enigne experiences slowdowns when running a large number of small remote tasks at the same time. Ray Core recommends to `avoid tiny task`_. +When modin DataFrame has a large number of partitions, some functions produce a large number of remote tasks, which can cause slowdowns. +To solve this problem, Modin suggests using dynamic partitioning. This approach reduces the number of remote tasks +by combining multiple partitions into a single virtual partition and perform a common remote task on them. + +Dynamic partitioning is typically used for operations that are fully or partially executed on all partitions separately. + +.. code-block:: python + + import modin.pandas as pd + from modin.config import context + + df = pd.DataFrame(...) + + with context(DynamicPartitioning=True): + df.abs() + +Dynamic partitioning is also not always useful, and this approach is usually used for medium-sized DataFrames with a large number of columns. +If the number of columns is small, the number of partitions will be close to the number of CPUs, and Ray will not have this problem. +If the DataFrame has too many rows, this is also not a good case for using Dynamic-partitioning, since each task is no longer tiny and performing +the combined tasks carries more overhead than assigning them separately. + +Unfortunately, the use of Dynamic-partitioning depends on various factors such as data size, number of CPUs, operations performed, +and it is up to the user to determine whether Dynamic-partitioning will give a boost in his case or not. + Understanding Modin's partitioning mechanism """""""""""""""""""""""""""""""""""""""""""" @@ -311,3 +339,4 @@ an inner join you may want to swap left and right DataFrames. Note that result columns order may differ for first and second ``merge``. .. _range-partitioning: https://www.techopedia.com/definition/31994/range-partitioning +.. _`avoid tiny task`: https://docs.ray.io/en/latest/ray-core/tips-for-first-time.html#tip-2-avoid-tiny-tasks diff --git a/modin/config/__init__.py b/modin/config/__init__.py index cf5f7895c5d..60806a79231 100644 --- a/modin/config/__init__.py +++ b/modin/config/__init__.py @@ -23,6 +23,7 @@ CpuCount, DaskThreadsPerWorker, DocModule, + DynamicPartitioning, Engine, EnvironmentVariable, GithubCI, @@ -95,6 +96,7 @@ "AsyncReadMode", "ReadSqlEngine", "IsExperimental", + "DynamicPartitioning", # For tests "TrackFileLeaks", "TestReadFromSqlServer", diff --git a/modin/config/envvars.py b/modin/config/envvars.py index 3635c63d026..60d82e4a22c 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -892,6 +892,18 @@ class DaskThreadsPerWorker(EnvironmentVariable, type=int): default = 1 +class DynamicPartitioning(EnvironmentVariable, type=bool): + """ + Set to true to use Modin's dynamic-partitioning implementation where possible. + + Please refer to documentation for cases where enabling this options would be beneficial: + https://modin.readthedocs.io/en/stable/usage_guide/optimization_notes/index.html#dynamic-partitioning-in-modin + """ + + varname = "MODIN_DYNAMIC_PARTITIONING" + default = False + + def _check_vars() -> None: """ Check validity of environment variables. diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 8b4f6788931..05854239206 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -30,6 +30,7 @@ from modin.config import ( BenchmarkMode, CpuCount, + DynamicPartitioning, Engine, MinColumnPartitionSize, MinRowPartitionSize, @@ -675,7 +676,7 @@ def map_partitions( NumPy array An array of partitions """ - if np.prod(partitions.shape) <= 1.5 * CpuCount.get(): + if not DynamicPartitioning.get(): # block-wise map new_partitions = cls.base_map_partitions( partitions, map_func, func_args, func_kwargs diff --git a/modin/tests/core/storage_formats/pandas/test_internals.py b/modin/tests/core/storage_formats/pandas/test_internals.py index 4113f3ce0ed..b030fe7b216 100644 --- a/modin/tests/core/storage_formats/pandas/test_internals.py +++ b/modin/tests/core/storage_formats/pandas/test_internals.py @@ -2658,7 +2658,7 @@ def remote_func(): ), ], ) -def test_map_approaches(partitioning_scheme, expected_map_approach): +def test_dynamic_partitioning(partitioning_scheme, expected_map_approach): data_size = MinRowPartitionSize.get() * CpuCount.get() data = {f"col{i}": np.ones(data_size) for i in range(data_size)} df = pandas.DataFrame(data) @@ -2672,8 +2672,9 @@ def test_map_approaches(partitioning_scheme, expected_map_approach): expected_map_approach, wraps=getattr(partition_mgr_cls, expected_map_approach), ) as expected_method: - partition_mgr_cls.map_partitions(partitions, lambda x: x * 2) - expected_method.assert_called() + with context(DynamicPartitioning=True): + partition_mgr_cls.map_partitions(partitions, lambda x: x * 2) + expected_method.assert_called() def test_map_partitions_joined_by_column():