Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#6594: fix usage of Modin objects inside UDFs for apply #6673

Merged
merged 9 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4119,6 +4119,16 @@ def wait_computations(self):
"""Wait for all computations to complete without materializing data."""
self._partition_mgr_cls.wait_partitions(self._partitions.flatten())

def support_materialization_in_worker_process(self) -> bool:
"""
Whether it's possible to call function `to_pandas` during the pickling process, at the moment of recreating the object.

Returns
-------
bool
"""
return True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would we ever want to recreate modin objects on a worker? wouldn't it make applying any method to them super slow anyway?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would we ever want to recreate modin objects on a worker?

The main goal is to avoid materializing the dataframe in the main process and transfer this operation to the worker process.

wouldn't it make applying any method to them super slow anyway?

The only operation needed for this is taking the object by reference (get operation) to perform the conversion to pandas and perform any operations on the pandas object.


def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True):
"""
Get a Modin DataFrame that implements the dataframe exchange protocol.
Expand Down
23 changes: 21 additions & 2 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@
import pandas
from pandas._libs.lib import no_default

from modin.config import BenchmarkMode, Engine, NPartitions, ProgressBar
from modin.config import (
BenchmarkMode,
Engine,
NPartitions,
PersistentPickle,
ProgressBar,
)
from modin.core.dataframe.pandas.utils import concatenate
from modin.core.storage_formats.pandas.utils import compute_chunksize
from modin.error_message import ErrorMessage
Expand Down Expand Up @@ -121,7 +127,20 @@ def preprocess_func(cls, map_func):
`map_func` if the `apply` method of the `PandasDataframePartition` object
you are using does not require any modification to a given function.
"""
return cls._partition_class.preprocess_func(map_func)
old_value = PersistentPickle.get()
# When performing a function with Modin objects, it is more profitable to
# do the conversion to pandas once on the main process than several times
# on worker processes. Details: https://github.com/modin-project/modin/pull/6673/files#r1391086755
# For Dask, otherwise there may be an error: `coroutine 'Client._gather' was never awaited`
need_update = not PersistentPickle.get() and Engine.get() != "Dask"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dchigarev I suppose we can leave the current implementation, but for those cases where to_pandas is called several times (for example, in different apply that go through preprocess), enable a mode in which to_pandas is called once in the main process.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we can leave the current implementation

You mean ._inflate_light()? Where else do we use it? If we are certain that we always want for dataframes to be persistently pickled, then I suppose we shouldn't leave this implementation. There's still a possibility that there are other places in our project where we submit kernels, but don't do this config variables manipulation, which will result into that the ._inflate_light() implementation will be called against our will.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my take is, that we either should drop the ._inflate_light() implementation at all, or remove these config variable manipulations and always execute .__reduce__() in accordance with what a user set in PersistentPickle variable. At this point I'm ok with both of the options, it's up to you to decide @anmyachev

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean ._inflate_light()?

Yes

Where else do we use it?

In a situation like this (it seems to me that this case is almost never seen, if you think so, then I’ll delete _inflate_light):

other = pickle.loads(pickle.dumps(modin_df))

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on the second thought, it maybe it make sense keeping inflate_light, let's leave it for now

if need_update:
PersistentPickle.put(True)
try:
result = cls._partition_class.preprocess_func(map_func)
finally:
if need_update:
PersistentPickle.put(old_value)
return result

# END Abstract Methods

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,26 @@ class PandasOnDaskDataframe(PandasDataframe):
"""

_partition_mgr_cls = PandasOnDaskDataframePartitionManager

@classmethod
def reconnect(cls, address, attributes): # noqa: GL08
# The main goal is to configure the client for the worker process
# using the address passed by the custom `__reduce__` function
try:
from distributed import default_client

default_client()
except ValueError:
from distributed import Client

# setup `default_client` for worker process
_ = Client(address)
obj = cls.__new__(cls)
obj.__dict__.update(attributes)
return obj

def __reduce__(self): # noqa: GL08
from distributed import default_client

address = default_client().scheduler_info()["address"]
return self.reconnect, (address, self.__dict__)
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ class PandasOnUnidistDataframe(PandasDataframe):
"""

_partition_mgr_cls = PandasOnUnidistDataframePartitionManager

def support_materialization_in_worker_process(self) -> bool:
# more details why this is not `True` in https://github.com/modin-project/modin/pull/6673
return False
10 changes: 10 additions & 0 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,16 @@ def execute(self):
"""Wait for all computations to complete without materializing data."""
pass

def support_materialization_in_worker_process(self) -> bool:
"""
Whether it's possible to call function `to_pandas` during the pickling process, at the moment of recreating the object.

Returns
-------
bool
"""
return self._modin_frame.support_materialization_in_worker_process()

# END Data Management Methods

# To/From Pandas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@
# HDK-specific method
self._modin_frame.force_import()

def support_materialization_in_worker_process(self) -> bool:
return True

Check warning on line 195 in modin/experimental/core/storage_formats/hdk/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/storage_formats/hdk/query_compiler.py#L195

Added line #L195 was not covered by tests

def to_pandas(self):
return self._modin_frame.to_pandas()

Expand Down
29 changes: 24 additions & 5 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import datetime
import functools
import itertools
import os
import re
import sys
import warnings
Expand Down Expand Up @@ -3107,7 +3108,7 @@ def _getitem(self, key):

# Persistance support methods - BEGIN
@classmethod
def _inflate_light(cls, query_compiler):
def _inflate_light(cls, query_compiler, source_pid):
"""
Re-creates the object from previously-serialized lightweight representation.

Expand All @@ -3117,35 +3118,53 @@ def _inflate_light(cls, query_compiler):
----------
query_compiler : BaseQueryCompiler
Query compiler to use for object re-creation.
source_pid : int
Determines whether a Modin or pandas object needs to be created.
Modin objects are created only on the main process.

Returns
-------
DataFrame
New ``DataFrame`` based on the `query_compiler`.
"""
if os.getpid() != source_pid:
return query_compiler.to_pandas()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct me if I'm wrong, but my understanding is that we use _inflate_light() only when we unpickle a modin.DataFrame from the plasma storage (as if we wanted to read it from the disk, we would use _inflate_full()), so my question is, whether it makes sense to transfer the query compiler to workers and only then call .to_pandas()? Isn't calling .to_pandas() several times on every worker is more expensive than calling it only once in the main process when serializing? What are the benefits of using ._inflate_light()?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only question that bothers me, otherwise, the PR looks good

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't calling .to_pandas() several times on every worker is more expensive than calling it only once in the main process when serializing?

It may be more expensive, but this method allows calculations to run asynchronously, which mitigates this problem (partially) given that worker processes tend to be under-loaded. On the other hand, the memory consumption on the other hand will be much greater, I believe that you are right and we need to make the call in the main process.

# The current logic does not involve creating Modin objects
# and manipulation with them in worker processes
return cls(query_compiler=query_compiler)

@classmethod
def _inflate_full(cls, pandas_df):
def _inflate_full(cls, pandas_df, source_pid):
"""
Re-creates the object from previously-serialized disk-storable representation.

Parameters
----------
pandas_df : pandas.DataFrame
Data to use for object re-creation.
source_pid : int
Determines whether a Modin or pandas object needs to be created.
Modin objects are created only on the main process.

Returns
-------
DataFrame
New ``DataFrame`` based on the `pandas_df`.
"""
if os.getpid() != source_pid:
return pandas_df
# The current logic does not involve creating Modin objects
# and manipulation with them in worker processes
return cls(data=from_pandas(pandas_df))

def __reduce__(self):
self._query_compiler.finalize()
if PersistentPickle.get():
return self._inflate_full, (self._to_pandas(),)
return self._inflate_light, (self._query_compiler,)
pid = os.getpid()
if (
PersistentPickle.get()
or not self._query_compiler.support_materialization_in_worker_process()
):
return self._inflate_full, (self._to_pandas(), pid)
return self._inflate_light, (self._query_compiler, pid)

# Persistance support methods - END
29 changes: 24 additions & 5 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from __future__ import annotations

import os
import warnings
from typing import IO, TYPE_CHECKING, Hashable, Optional, Union

Expand Down Expand Up @@ -2497,7 +2498,7 @@ def _repartition(self):

# Persistance support methods - BEGIN
@classmethod
def _inflate_light(cls, query_compiler, name):
def _inflate_light(cls, query_compiler, name, source_pid):
"""
Re-creates the object from previously-serialized lightweight representation.

Expand All @@ -2509,35 +2510,53 @@ def _inflate_light(cls, query_compiler, name):
Query compiler to use for object re-creation.
name : str
The name to give to the new object.
source_pid : int
Determines whether a Modin or pandas object needs to be created.
Modin objects are created only on the main process.

Returns
-------
Series
New Series based on the `query_compiler`.
"""
if os.getpid() != source_pid:
return query_compiler.to_pandas()
# The current logic does not involve creating Modin objects
# and manipulation with them in worker processes
return cls(query_compiler=query_compiler, name=name)

@classmethod
def _inflate_full(cls, pandas_series):
def _inflate_full(cls, pandas_series, source_pid):
"""
Re-creates the object from previously-serialized disk-storable representation.

Parameters
----------
pandas_series : pandas.Series
Data to use for object re-creation.
source_pid : int
Determines whether a Modin or pandas object needs to be created.
Modin objects are created only on the main process.

Returns
-------
Series
New Series based on the `pandas_series`.
"""
if os.getpid() != source_pid:
return pandas_series
# The current logic does not involve creating Modin objects
# and manipulation with them in worker processes
return cls(data=pandas_series)

def __reduce__(self):
self._query_compiler.finalize()
if PersistentPickle.get():
return self._inflate_full, (self._to_pandas(),)
return self._inflate_light, (self._query_compiler, self.name)
pid = os.getpid()
if (
PersistentPickle.get()
or not self._query_compiler.support_materialization_in_worker_process()
):
return self._inflate_full, (self._to_pandas(), pid)
return self._inflate_light, (self._query_compiler, self.name, pid)

# Persistance support methods - END
29 changes: 28 additions & 1 deletion modin/pandas/test/dataframe/test_pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import modin.pandas as pd
from modin.config import PersistentPickle
from modin.pandas.test.utils import df_equals
from modin.pandas.test.utils import create_test_dfs, df_equals


@pytest.fixture
Expand Down Expand Up @@ -47,6 +47,33 @@ def test_dataframe_pickle(modin_df, persistent):
df_equals(modin_df, other)


def test__reduce__():
# `DataFrame.__reduce__` will be called implicitly when lambda expressions are
# pre-processed for the distributed engine.
dataframe_data = ["Major League Baseball", "National Basketball Association"]
abbr_md, abbr_pd = create_test_dfs(dataframe_data, index=["MLB", "NBA"])
# breakpoint()

dataframe_data = {
"name": ["Mariners", "Lakers"] * 500,
"league_abbreviation": ["MLB", "NBA"] * 500,
}
teams_md, teams_pd = create_test_dfs(dataframe_data)

result_md = (
teams_md.set_index("name")
.league_abbreviation.apply(lambda abbr: abbr_md[0].loc[abbr])
.rename("league")
)

result_pd = (
teams_pd.set_index("name")
.league_abbreviation.apply(lambda abbr: abbr_pd[0].loc[abbr])
.rename("league")
)
df_equals(result_md, result_pd)


def test_column_pickle(modin_column, modin_df, persistent):
dmp = pickle.dumps(modin_column)
other = pickle.loads(dmp)
Expand Down
27 changes: 27 additions & 0 deletions modin/pandas/test/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
bool_arg_keys,
bool_arg_values,
categories_equals,
create_test_dfs,
default_to_pandas_ignore_string,
df_equals,
df_equals_with_non_stable_indices,
Expand Down Expand Up @@ -4794,3 +4795,29 @@ def test_binary_numpy_universal_function_issue_6483():
*create_test_series(test_data["float_nan_data"]),
lambda series: np.arctan2(series, np.sin(series)),
)


def test__reduce__():
# `Series.__reduce__` will be called implicitly when lambda expressions are
# pre-processed for the distributed engine.
series_data = ["Major League Baseball", "National Basketball Association"]
abbr_md, abbr_pd = create_test_series(series_data, index=["MLB", "NBA"])

dataframe_data = {
"name": ["Mariners", "Lakers"] * 500,
"league_abbreviation": ["MLB", "NBA"] * 500,
}
teams_md, teams_pd = create_test_dfs(dataframe_data)

result_md = (
teams_md.set_index("name")
.league_abbreviation.apply(lambda abbr: abbr_md.loc[abbr])
.rename("league")
)

result_pd = (
teams_pd.set_index("name")
.league_abbreviation.apply(lambda abbr: abbr_pd.loc[abbr])
.rename("league")
)
df_equals(result_md, result_pd)
Loading