Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7368: Add a new environment variable for using dynamic partitioning #7369

Merged
merged 3 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/usage_guide/optimization_notes/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,38 @@ Range-partitioning is not a silver bullet, meaning that enabling it is not alway
a link to the list of operations that have support for range-partitioning and practical advices on when one should
enable it: :doc:`operations that support range-partitioning </usage_guide/optimization_notes/range_partitioning_ops>`.

Dynamic-partitioning in Modin
"""""""""""""""""""""""""""""

Ray engine experiences slowdowns when running a large number of small remote tasks at the same time. Ray Core recommends to `avoid tiny task`_.
When modin DataFrame has a large number of partitions, some functions produce a large number of remote tasks, which can cause slowdowns.
To solve this problem, Modin suggests using dynamic partitioning. This approach reduces the number of remote tasks
by combining multiple partitions into a single virtual partition and perform a common remote task on them.

Dynamic partitioning is typically used for operations that are fully or partially executed on all partitions separately.

.. code-block:: python

import modin.pandas as pd
from modin.config import context

df = pd.DataFrame(...)

with context(DynamicPartitioning=True):
df.abs()

Dynamic partitioning is also not always useful, and this approach is usually used for medium-sized DataFrames with a large number of columns.
If the number of columns is small, the number of partitions will be close to the number of CPUs, and Ray will not have this problem.
If the DataFrame has too many rows, this is also not a good case for using Dynamic-partitioning, since each task is no longer tiny and performing
the combined tasks carries more overhead than assigning them separately.

Unfortunately, the use of Dynamic-partitioning depends on various factors such as data size, number of CPUs, operations performed,
and it is up to the user to determine whether Dynamic-partitioning will give a boost in his case or not.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we plan to look for a heuristic that can switch implementations automatically, then we could add a few words about this (and a link to the issue).


..
TODO: Define heuristics to automatically enable dynamic partitioning without performance penalty.
`Issue #7370 <https://github.com/modin-project/modin/issues/7370>`_

Understanding Modin's partitioning mechanism
""""""""""""""""""""""""""""""""""""""""""""

Expand Down Expand Up @@ -311,3 +343,4 @@ an inner join you may want to swap left and right DataFrames.
Note that result columns order may differ for first and second ``merge``.

.. _range-partitioning: https://www.techopedia.com/definition/31994/range-partitioning
.. _`avoid tiny task`: https://docs.ray.io/en/latest/ray-core/tips-for-first-time.html#tip-2-avoid-tiny-tasks
2 changes: 2 additions & 0 deletions modin/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
CpuCount,
DaskThreadsPerWorker,
DocModule,
DynamicPartitioning,
Engine,
EnvironmentVariable,
GithubCI,
Expand Down Expand Up @@ -95,6 +96,7 @@
"AsyncReadMode",
"ReadSqlEngine",
"IsExperimental",
"DynamicPartitioning",
# For tests
"TrackFileLeaks",
"TestReadFromSqlServer",
Expand Down
12 changes: 12 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,18 @@ class DaskThreadsPerWorker(EnvironmentVariable, type=int):
default = 1


class DynamicPartitioning(EnvironmentVariable, type=bool):
"""
Set to true to use Modin's dynamic-partitioning implementation where possible.

Please refer to documentation for cases where enabling this options would be beneficial:
https://modin.readthedocs.io/en/stable/usage_guide/optimization_notes/index.html#dynamic-partitioning-in-modin
"""

varname = "MODIN_DYNAMIC_PARTITIONING"
default = False


def _check_vars() -> None:
"""
Check validity of environment variables.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from modin.config import (
BenchmarkMode,
CpuCount,
DynamicPartitioning,
Engine,
MinColumnPartitionSize,
MinRowPartitionSize,
Expand Down Expand Up @@ -675,7 +676,7 @@ def map_partitions(
NumPy array
An array of partitions
"""
if np.prod(partitions.shape) <= 1.5 * CpuCount.get():
if not DynamicPartitioning.get():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to keep the previous default behavior.

Suggested change
if not DynamicPartitioning.get():
if np.prod(partitions.shape) <= 1.5 * CpuCount.get() and not DynamicPartitioning.get():

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should do that? Shouldn't the user be given more freedom to decide when to activate this option?
The user can activate this locally, only for the required operations.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user now has the ability to force the use of another code branch at his own choice, this is already more flexibility than before. And since this condition worked quite well before and considering that slowdowns are possible when using this new variable, I would replace the default behavior more carefully.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confused with similar code in rebalance_partitions function. Ok, leave it as is.

# block-wise map
new_partitions = cls.base_map_partitions(
partitions, map_func, func_args, func_kwargs
Expand Down
7 changes: 4 additions & 3 deletions modin/tests/core/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2658,7 +2658,7 @@ def remote_func():
),
],
)
def test_map_approaches(partitioning_scheme, expected_map_approach):
def test_dynamic_partitioning(partitioning_scheme, expected_map_approach):
data_size = MinRowPartitionSize.get() * CpuCount.get()
data = {f"col{i}": np.ones(data_size) for i in range(data_size)}
df = pandas.DataFrame(data)
Expand All @@ -2672,8 +2672,9 @@ def test_map_approaches(partitioning_scheme, expected_map_approach):
expected_map_approach,
wraps=getattr(partition_mgr_cls, expected_map_approach),
) as expected_method:
partition_mgr_cls.map_partitions(partitions, lambda x: x * 2)
expected_method.assert_called()
with context(DynamicPartitioning=True):
partition_mgr_cls.map_partitions(partitions, lambda x: x * 2)
expected_method.assert_called()


def test_map_partitions_joined_by_column():
Expand Down
Loading