Skip to content

Commit

Permalink
FIX-#6594: fix usage of Modin objects inside UDFs for apply (#6673)
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev authored Nov 14, 2023
1 parent 41ecc92 commit 7de7b92
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 13 deletions.
10 changes: 10 additions & 0 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4119,6 +4119,16 @@ def wait_computations(self):
"""Wait for all computations to complete without materializing data."""
self._partition_mgr_cls.wait_partitions(self._partitions.flatten())

def support_materialization_in_worker_process(self) -> bool:
"""
Whether it's possible to call function `to_pandas` during the pickling process, at the moment of recreating the object.
Returns
-------
bool
"""
return True

def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True):
"""
Get a Modin DataFrame that implements the dataframe exchange protocol.
Expand Down
23 changes: 21 additions & 2 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@
import pandas
from pandas._libs.lib import no_default

from modin.config import BenchmarkMode, Engine, NPartitions, ProgressBar
from modin.config import (
BenchmarkMode,
Engine,
NPartitions,
PersistentPickle,
ProgressBar,
)
from modin.core.dataframe.pandas.utils import concatenate
from modin.core.storage_formats.pandas.utils import compute_chunksize
from modin.error_message import ErrorMessage
Expand Down Expand Up @@ -121,7 +127,20 @@ def preprocess_func(cls, map_func):
`map_func` if the `apply` method of the `PandasDataframePartition` object
you are using does not require any modification to a given function.
"""
return cls._partition_class.preprocess_func(map_func)
old_value = PersistentPickle.get()
# When performing a function with Modin objects, it is more profitable to
# do the conversion to pandas once on the main process than several times
# on worker processes. Details: https://github.com/modin-project/modin/pull/6673/files#r1391086755
# For Dask, otherwise there may be an error: `coroutine 'Client._gather' was never awaited`
need_update = not PersistentPickle.get() and Engine.get() != "Dask"
if need_update:
PersistentPickle.put(True)
try:
result = cls._partition_class.preprocess_func(map_func)
finally:
if need_update:
PersistentPickle.put(old_value)
return result

# END Abstract Methods

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,26 @@ class PandasOnDaskDataframe(PandasDataframe):
"""

_partition_mgr_cls = PandasOnDaskDataframePartitionManager

@classmethod
def reconnect(cls, address, attributes): # noqa: GL08
# The main goal is to configure the client for the worker process
# using the address passed by the custom `__reduce__` function
try:
from distributed import default_client

default_client()
except ValueError:
from distributed import Client

# setup `default_client` for worker process
_ = Client(address)
obj = cls.__new__(cls)
obj.__dict__.update(attributes)
return obj

def __reduce__(self): # noqa: GL08
from distributed import default_client

address = default_client().scheduler_info()["address"]
return self.reconnect, (address, self.__dict__)
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ class PandasOnUnidistDataframe(PandasDataframe):
"""

_partition_mgr_cls = PandasOnUnidistDataframePartitionManager

def support_materialization_in_worker_process(self) -> bool:
# more details why this is not `True` in https://github.com/modin-project/modin/pull/6673
return False
10 changes: 10 additions & 0 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,16 @@ def execute(self):
"""Wait for all computations to complete without materializing data."""
pass

def support_materialization_in_worker_process(self) -> bool:
"""
Whether it's possible to call function `to_pandas` during the pickling process, at the moment of recreating the object.
Returns
-------
bool
"""
return self._modin_frame.support_materialization_in_worker_process()

# END Data Management Methods

# To/From Pandas
Expand Down
3 changes: 3 additions & 0 deletions modin/experimental/core/storage_formats/hdk/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ def force_import(self):
# HDK-specific method
self._modin_frame.force_import()

def support_materialization_in_worker_process(self) -> bool:
return True

def to_pandas(self):
return self._modin_frame.to_pandas()

Expand Down
29 changes: 24 additions & 5 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import datetime
import functools
import itertools
import os
import re
import sys
import warnings
Expand Down Expand Up @@ -3107,7 +3108,7 @@ def _getitem(self, key):

# Persistance support methods - BEGIN
@classmethod
def _inflate_light(cls, query_compiler):
def _inflate_light(cls, query_compiler, source_pid):
"""
Re-creates the object from previously-serialized lightweight representation.
Expand All @@ -3117,35 +3118,53 @@ def _inflate_light(cls, query_compiler):
----------
query_compiler : BaseQueryCompiler
Query compiler to use for object re-creation.
source_pid : int
Determines whether a Modin or pandas object needs to be created.
Modin objects are created only on the main process.
Returns
-------
DataFrame
New ``DataFrame`` based on the `query_compiler`.
"""
if os.getpid() != source_pid:
return query_compiler.to_pandas()
# The current logic does not involve creating Modin objects
# and manipulation with them in worker processes
return cls(query_compiler=query_compiler)

@classmethod
def _inflate_full(cls, pandas_df):
def _inflate_full(cls, pandas_df, source_pid):
"""
Re-creates the object from previously-serialized disk-storable representation.
Parameters
----------
pandas_df : pandas.DataFrame
Data to use for object re-creation.
source_pid : int
Determines whether a Modin or pandas object needs to be created.
Modin objects are created only on the main process.
Returns
-------
DataFrame
New ``DataFrame`` based on the `pandas_df`.
"""
if os.getpid() != source_pid:
return pandas_df
# The current logic does not involve creating Modin objects
# and manipulation with them in worker processes
return cls(data=from_pandas(pandas_df))

def __reduce__(self):
self._query_compiler.finalize()
if PersistentPickle.get():
return self._inflate_full, (self._to_pandas(),)
return self._inflate_light, (self._query_compiler,)
pid = os.getpid()
if (
PersistentPickle.get()
or not self._query_compiler.support_materialization_in_worker_process()
):
return self._inflate_full, (self._to_pandas(), pid)
return self._inflate_light, (self._query_compiler, pid)

# Persistance support methods - END
29 changes: 24 additions & 5 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from __future__ import annotations

import os
import warnings
from typing import IO, TYPE_CHECKING, Hashable, Optional, Union

Expand Down Expand Up @@ -2497,7 +2498,7 @@ def _repartition(self):

# Persistance support methods - BEGIN
@classmethod
def _inflate_light(cls, query_compiler, name):
def _inflate_light(cls, query_compiler, name, source_pid):
"""
Re-creates the object from previously-serialized lightweight representation.
Expand All @@ -2509,35 +2510,53 @@ def _inflate_light(cls, query_compiler, name):
Query compiler to use for object re-creation.
name : str
The name to give to the new object.
source_pid : int
Determines whether a Modin or pandas object needs to be created.
Modin objects are created only on the main process.
Returns
-------
Series
New Series based on the `query_compiler`.
"""
if os.getpid() != source_pid:
return query_compiler.to_pandas()
# The current logic does not involve creating Modin objects
# and manipulation with them in worker processes
return cls(query_compiler=query_compiler, name=name)

@classmethod
def _inflate_full(cls, pandas_series):
def _inflate_full(cls, pandas_series, source_pid):
"""
Re-creates the object from previously-serialized disk-storable representation.
Parameters
----------
pandas_series : pandas.Series
Data to use for object re-creation.
source_pid : int
Determines whether a Modin or pandas object needs to be created.
Modin objects are created only on the main process.
Returns
-------
Series
New Series based on the `pandas_series`.
"""
if os.getpid() != source_pid:
return pandas_series
# The current logic does not involve creating Modin objects
# and manipulation with them in worker processes
return cls(data=pandas_series)

def __reduce__(self):
self._query_compiler.finalize()
if PersistentPickle.get():
return self._inflate_full, (self._to_pandas(),)
return self._inflate_light, (self._query_compiler, self.name)
pid = os.getpid()
if (
PersistentPickle.get()
or not self._query_compiler.support_materialization_in_worker_process()
):
return self._inflate_full, (self._to_pandas(), pid)
return self._inflate_light, (self._query_compiler, self.name, pid)

# Persistance support methods - END
29 changes: 28 additions & 1 deletion modin/pandas/test/dataframe/test_pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import modin.pandas as pd
from modin.config import PersistentPickle
from modin.pandas.test.utils import df_equals
from modin.pandas.test.utils import create_test_dfs, df_equals


@pytest.fixture
Expand Down Expand Up @@ -47,6 +47,33 @@ def test_dataframe_pickle(modin_df, persistent):
df_equals(modin_df, other)


def test__reduce__():
# `DataFrame.__reduce__` will be called implicitly when lambda expressions are
# pre-processed for the distributed engine.
dataframe_data = ["Major League Baseball", "National Basketball Association"]
abbr_md, abbr_pd = create_test_dfs(dataframe_data, index=["MLB", "NBA"])
# breakpoint()

dataframe_data = {
"name": ["Mariners", "Lakers"] * 500,
"league_abbreviation": ["MLB", "NBA"] * 500,
}
teams_md, teams_pd = create_test_dfs(dataframe_data)

result_md = (
teams_md.set_index("name")
.league_abbreviation.apply(lambda abbr: abbr_md[0].loc[abbr])
.rename("league")
)

result_pd = (
teams_pd.set_index("name")
.league_abbreviation.apply(lambda abbr: abbr_pd[0].loc[abbr])
.rename("league")
)
df_equals(result_md, result_pd)


def test_column_pickle(modin_column, modin_df, persistent):
dmp = pickle.dumps(modin_column)
other = pickle.loads(dmp)
Expand Down
27 changes: 27 additions & 0 deletions modin/pandas/test/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
bool_arg_keys,
bool_arg_values,
categories_equals,
create_test_dfs,
default_to_pandas_ignore_string,
df_equals,
df_equals_with_non_stable_indices,
Expand Down Expand Up @@ -4794,3 +4795,29 @@ def test_binary_numpy_universal_function_issue_6483():
*create_test_series(test_data["float_nan_data"]),
lambda series: np.arctan2(series, np.sin(series)),
)


def test__reduce__():
# `Series.__reduce__` will be called implicitly when lambda expressions are
# pre-processed for the distributed engine.
series_data = ["Major League Baseball", "National Basketball Association"]
abbr_md, abbr_pd = create_test_series(series_data, index=["MLB", "NBA"])

dataframe_data = {
"name": ["Mariners", "Lakers"] * 500,
"league_abbreviation": ["MLB", "NBA"] * 500,
}
teams_md, teams_pd = create_test_dfs(dataframe_data)

result_md = (
teams_md.set_index("name")
.league_abbreviation.apply(lambda abbr: abbr_md.loc[abbr])
.rename("league")
)

result_pd = (
teams_pd.set_index("name")
.league_abbreviation.apply(lambda abbr: abbr_pd.loc[abbr])
.rename("league")
)
df_equals(result_md, result_pd)

0 comments on commit 7de7b92

Please sign in to comment.