Skip to content

Commit

Permalink
PERF-#2813: Distributed from_pandas() for numerical data in Ray (#6640
Browse files Browse the repository at this point in the history
)

Signed-off-by: Dmitry Chigarev <[email protected]>
Co-authored-by: Iaroslav Igoshev <[email protected]>
  • Loading branch information
dchigarev and YarShev authored Oct 16, 2023
1 parent 3dbdfc6 commit 658f6e1
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 37 deletions.
16 changes: 14 additions & 2 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,20 @@ class AsyncReadMode(EnvironmentVariable, type=bool):
"""
It does not wait for the end of reading information from the source.
Can break situations when reading occurs in a context, when exiting
from which the source is deleted.
It basically means, that the reading function only launches tasks for the dataframe
to be read/created, but not ensures that the construction is finalized by the time
the reading function returns a dataframe.
This option was brought to improve performance of reading/construction
of Modin DataFrames, however it may also:
1. Increase the peak memory consumption. Since the garbage collection of the
temporary objects created during the reading is now also lazy and will only
be performed when the reading/construction is actually finished.
2. Can break situations when the source is manually deleted after the reading
function returns a result, for example, when reading inside of a context-block
that deletes the file on ``__exit__()``.
"""

varname = "MODIN_ASYNC_READ_MODE"
Expand Down
92 changes: 57 additions & 35 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,53 @@ def to_numpy(cls, partitions, **kwargs):
[[block.to_numpy(**kwargs) for block in row] for row in partitions]
)

@classmethod
def split_pandas_df_into_partitions(
cls, df, row_chunksize, col_chunksize, update_bar
):
"""
Split given pandas DataFrame according to the row/column chunk sizes into distributed partitions.
Parameters
----------
df : pandas.DataFrame
row_chunksize : int
col_chunksize : int
update_bar : callable(x) -> x
Function that updates a progress bar.
Returns
-------
2D np.ndarray[PandasDataframePartition]
"""
put_func = cls._partition_class.put
# even a full-axis slice can cost something (https://github.com/pandas-dev/pandas/issues/55202)
# so we try not to do it if unnecessary.
# FIXME: it appears that this optimization doesn't work for Unidist correctly as it
# doesn't explicitly copy the data when putting it into storage (as the rest engines do)
# causing it to eventially share memory with a pandas object that was provided by user.
# Everything works fine if we do this column slicing as pandas then would set some flags
# to perform in COW mode apparently (and so it wouldn't crash our tests).
# @YarShev promised that this will be eventially fixed on Unidist's side, but for now there's
# this hacky condition
if col_chunksize >= len(df.columns) and Engine.get() != "Unidist":
col_parts = [df]
else:
col_parts = [
df.iloc[:, i : i + col_chunksize]
for i in range(0, len(df.columns), col_chunksize)
]
parts = [
[
update_bar(
put_func(col_part.iloc[i : i + row_chunksize]),
)
for col_part in col_parts
]
for i in range(0, len(df), row_chunksize)
]
return np.array(parts)

@classmethod
@wait_computations_if_benchmark_mode
def from_pandas(cls, df, return_dims=False):
Expand All @@ -785,14 +832,7 @@ def from_pandas(cls, df, return_dims=False):
np.ndarray or (np.ndarray, row_lengths, col_widths)
A NumPy array with partitions (with dimensions or not).
"""

def update_bar(pbar, f):
if ProgressBar.get():
pbar.update(1)
return f

num_splits = NPartitions.get()
put_func = cls._partition_class.put
row_chunksize = compute_chunksize(df.shape[0], num_splits)
col_chunksize = compute_chunksize(df.shape[1], num_splits)

Expand Down Expand Up @@ -820,36 +860,18 @@ def update_bar(pbar, f):
else:
pbar = None

# even a full-axis slice can cost something (https://github.com/pandas-dev/pandas/issues/55202)
# so we try not to do it if unnecessary.
# FIXME: it appears that this optimization doesn't work for Unidist correctly as it
# doesn't explicitly copy the data when putting it into storage (as the rest engines do)
# causing it to eventially share memory with a pandas object that was provided by user.
# Everything works fine if we do this column slicing as pandas then would set some flags
# to perform in COW mode apparently (and so it wouldn't crash our tests).
# @YarShev promised that this will be eventially fixed on Unidist's side, but for now there's
# this hacky condition
if col_chunksize >= len(df.columns) and Engine.get() != "Unidist":
col_parts = [df]
else:
col_parts = [
df.iloc[:, i : i + col_chunksize]
for i in range(0, len(df.columns), col_chunksize)
]
parts = [
[
update_bar(
pbar,
put_func(col_part.iloc[i : i + row_chunksize]),
)
for col_part in col_parts
]
for i in range(0, len(df), row_chunksize)
]
def update_bar(f):
if ProgressBar.get():
pbar.update(1)
return f

parts = cls.split_pandas_df_into_partitions(
df, row_chunksize, col_chunksize, update_bar
)
if ProgressBar.get():
pbar.close()
if not return_dims:
return np.array(parts)
return parts
else:
row_lengths = [
row_chunksize
Expand All @@ -863,7 +885,7 @@ def update_bar(pbar, f):
else len(df.columns) % col_chunksize or col_chunksize
for i in range(0, len(df.columns), col_chunksize)
]
return np.array(parts), row_lengths, col_widths
return parts, row_lengths, col_widths

@classmethod
def from_arrow(cls, at, return_dims=False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@

"""Module houses class that implements ``GenericRayDataframePartitionManager`` using Ray."""

import numpy as np
from pandas.core.dtypes.common import is_numeric_dtype

from modin.config import AsyncReadMode
from modin.core.execution.modin_aqp import progress_bar_wrapper
from modin.core.execution.ray.common import RayWrapper
from modin.core.execution.ray.generic.partitioning import (
GenericRayDataframePartitionManager,
)
from modin.logging import get_logger
from modin.utils import _inherit_docstrings

from .partition import PandasOnRayDataframePartition
from .virtual_partition import (
Expand Down Expand Up @@ -51,6 +57,67 @@ def wait_partitions(cls, partitions):
[block for partition in partitions for block in partition.list_of_blocks]
)

@classmethod
@_inherit_docstrings(
GenericRayDataframePartitionManager.split_pandas_df_into_partitions
)
def split_pandas_df_into_partitions(
cls, df, row_chunksize, col_chunksize, update_bar
):
# it was found out, that with the following condition it's more beneficial
# to use the distributed splitting, let's break them down:
# 1. The distributed splitting is used only when there's more than 6mln elements
# in the `df`, as with fewer data it's better to use the sequential splitting
# 2. Only used with numerical data, as with other dtypes, putting the whole big
# dataframe into the storage takes too much time.
# 3. The distributed splitting consumes more memory that the sequential one.
# It was estimated that it requires ~2.5x of the dataframe size, for now there
# was no good way found to automatically fall back to the sequential
# implementation in case of not enough memory, so currently we're enabling
# the distributed version only if 'AsyncReadMode' is set to True. Follow this
# discussion for more info on why automatical dispatching is hard:
# https://github.com/modin-project/modin/pull/6640#issuecomment-1759932664
enough_elements = (len(df) * len(df.columns)) > 6_000_000
all_numeric_types = all(is_numeric_dtype(dtype) for dtype in df.dtypes)
async_mode_on = AsyncReadMode.get()

distributed_splitting = enough_elements and all_numeric_types and async_mode_on

log = get_logger()

if not distributed_splitting:
log.info(
"Using sequential splitting in '.from_pandas()' because of some of the conditions are False: "
+ f"{enough_elements=}; {all_numeric_types=}; {async_mode_on=}"
)
return super().split_pandas_df_into_partitions(
df, row_chunksize, col_chunksize, update_bar
)

log.info("Using distributed splitting in '.from_pandas()'")
put_func = cls._partition_class.put

def mask(part, row_loc, col_loc):
# 2D iloc works surprisingly slow, so doing this chained iloc calls:
# https://github.com/pandas-dev/pandas/issues/55202
return part.apply(lambda df: df.iloc[row_loc, :].iloc[:, col_loc])

main_part = put_func(df)
parts = [
[
update_bar(
mask(
main_part,
slice(i, i + row_chunksize),
slice(j, j + col_chunksize),
),
)
for j in range(0, len(df.columns), col_chunksize)
]
for i in range(0, len(df), row_chunksize)
]
return np.array(parts)


def _make_wrapped_method(name: str):
"""
Expand Down
11 changes: 11 additions & 0 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3146,6 +3146,17 @@ def test_from_arrow():
df_equals(modin_df, pandas_df)


@pytest.mark.skipif(
condition=Engine.get() != "Ray",
reason="Distributed 'from_pandas' is only available for Ray engine",
)
@pytest.mark.parametrize("modify_config", [{AsyncReadMode: True}], indirect=True)
def test_distributed_from_pandas(modify_config):
pandas_df = pandas.DataFrame({f"col{i}": np.arange(200_000) for i in range(64)})
modin_df = pd.DataFrame(pandas_df)
df_equals(modin_df, pandas_df)


@pytest.mark.filterwarnings(default_to_pandas_ignore_string)
def test_from_spmatrix():
data = sparse.eye(3)
Expand Down

0 comments on commit 658f6e1

Please sign in to comment.