Skip to content

Commit

Permalink
FEAT-#7368: Add a new environment variable for using dynamic partitio…
Browse files Browse the repository at this point in the history
…ning

Signed-off-by: Kirill Suvorov <[email protected]>
  • Loading branch information
Retribution98 committed Aug 13, 2024
1 parent b236b76 commit 3de5359
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 4 deletions.
29 changes: 29 additions & 0 deletions docs/usage_guide/optimization_notes/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,34 @@ Range-partitioning is not a silver bullet, meaning that enabling it is not alway
a link to the list of operations that have support for range-partitioning and practical advices on when one should
enable it: :doc:`operations that support range-partitioning </usage_guide/optimization_notes/range_partitioning_ops>`.

Dynamic-partitioning in Modin
"""""""""""""""""""""""""""""

Ray enigne experiences slowdowns when running a large number of small remote tasks at the same time. Ray Core recommends to `avoid tiny task`_.
When modin DataFrame has a large number of partitions, some functions produce a large number of remote tasks, which can cause slowdowns.
To solve this problem, Modin suggests using dynamic partitioning. This approach reduces the number of remote tasks
by combining multiple partitions into a single virtual partition and perform a common remote task on them.

Dynamic partitioning is typically used for operations that are fully or partially executed on all partitions separately.

.. code-block:: python
import modin.pandas as pd
from modin.config import context
df = pd.DataFrame(...)
with context(DynamicPartitioning=True):
df.abs()
Dynamic partitioning is also not always useful, and this approach is usually used for medium-sized DataFrames with a large number of columns.
If the number of columns is small, the number of partitions will be close to the number of CPUs, and Ray will not have this problem.
If the DataFrame has too many rows, this is also not a good case for using Dynamic-partitioning, since each task is no longer tiny and performing
the combined tasks carries more overhead than assigning them separately.

Unfortunately, the use of Dynamic-partitioning depends on various factors such as data size, number of CPUs, operations performed,
and it is up to the user to determine whether Dynamic-partitioning will give a boost in his case or not.

Understanding Modin's partitioning mechanism
""""""""""""""""""""""""""""""""""""""""""""

Expand Down Expand Up @@ -311,3 +339,4 @@ an inner join you may want to swap left and right DataFrames.
Note that result columns order may differ for first and second ``merge``.

.. _range-partitioning: https://www.techopedia.com/definition/31994/range-partitioning
.. _`avoid tiny task`: https://docs.ray.io/en/latest/ray-core/tips-for-first-time.html#tip-2-avoid-tiny-tasks
2 changes: 2 additions & 0 deletions modin/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
CpuCount,
DaskThreadsPerWorker,
DocModule,
DynamicPartitioning,
Engine,
EnvironmentVariable,
GithubCI,
Expand Down Expand Up @@ -95,6 +96,7 @@
"AsyncReadMode",
"ReadSqlEngine",
"IsExperimental",
"DynamicPartitioning",
# For tests
"TrackFileLeaks",
"TestReadFromSqlServer",
Expand Down
12 changes: 12 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,18 @@ class DaskThreadsPerWorker(EnvironmentVariable, type=int):
default = 1


class DynamicPartitioning(EnvironmentVariable, type=bool):
"""
Set to true to use Modin's dynamic-partitioning implementation where possible.
Please refer to documentation for cases where enabling this options would be beneficial:
https://modin.readthedocs.io/en/stable/usage_guide/optimization_notes/index.html#dynamic-partitioning-in-modin
"""

varname = "MODIN_DYNAMIC_PARTITIONING"
default = False


def _check_vars() -> None:
"""
Check validity of environment variables.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from modin.config import (
BenchmarkMode,
CpuCount,
DynamicPartitioning,
Engine,
MinColumnPartitionSize,
MinRowPartitionSize,
Expand Down Expand Up @@ -675,7 +676,7 @@ def map_partitions(
NumPy array
An array of partitions
"""
if np.prod(partitions.shape) <= 1.5 * CpuCount.get():
if not DynamicPartitioning.get():
# block-wise map
new_partitions = cls.base_map_partitions(
partitions, map_func, func_args, func_kwargs
Expand Down
7 changes: 4 additions & 3 deletions modin/tests/core/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2658,7 +2658,7 @@ def remote_func():
),
],
)
def test_map_approaches(partitioning_scheme, expected_map_approach):
def test_dynamic_partitioning(partitioning_scheme, expected_map_approach):
data_size = MinRowPartitionSize.get() * CpuCount.get()
data = {f"col{i}": np.ones(data_size) for i in range(data_size)}
df = pandas.DataFrame(data)
Expand All @@ -2672,8 +2672,9 @@ def test_map_approaches(partitioning_scheme, expected_map_approach):
expected_map_approach,
wraps=getattr(partition_mgr_cls, expected_map_approach),
) as expected_method:
partition_mgr_cls.map_partitions(partitions, lambda x: x * 2)
expected_method.assert_called()
with context(DynamicPartitioning=True):
partition_mgr_cls.map_partitions(partitions, lambda x: x * 2)
expected_method.assert_called()


def test_map_partitions_joined_by_column():
Expand Down

0 comments on commit 3de5359

Please sign in to comment.