Skip to content

Commit

Permalink
fix 'DataFrame.__reduce__'
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed Oct 24, 2023
1 parent e200d31 commit 468c6d9
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 7 deletions.
29 changes: 24 additions & 5 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import datetime
import functools
import itertools
import os
import re
import sys
import warnings
Expand Down Expand Up @@ -3104,7 +3105,7 @@ def _getitem(self, key):

# Persistance support methods - BEGIN
@classmethod
def _inflate_light(cls, query_compiler):
def _inflate_light(cls, query_compiler, source_pid):
"""
Re-creates the object from previously-serialized lightweight representation.
Expand All @@ -3114,35 +3115,53 @@ def _inflate_light(cls, query_compiler):
----------
query_compiler : BaseQueryCompiler
Query compiler to use for object re-creation.
source_pid : int
Determines whether a Modin or pandas object needs to be created.
Modin objects are created only on the main process.
Returns
-------
DataFrame
New ``DataFrame`` based on the `query_compiler`.
"""
if os.getpid() != source_pid:
return query_compiler.to_pandas()
# The current logic does not involve creating Modin objects
# and manipulation with them in worker processes
return cls(query_compiler=query_compiler)

@classmethod
def _inflate_full(cls, pandas_df):
def _inflate_full(cls, pandas_df, source_pid):
"""
Re-creates the object from previously-serialized disk-storable representation.
Parameters
----------
pandas_df : pandas.DataFrame
Data to use for object re-creation.
source_pid : int
Determines whether a Modin or pandas object needs to be created.
Modin objects are created only on the main process.
Returns
-------
DataFrame
New ``DataFrame`` based on the `pandas_df`.
"""
if os.getpid() != source_pid:
return pandas_df

Check warning on line 3152 in modin/pandas/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/pandas/dataframe.py#L3152

Added line #L3152 was not covered by tests
# The current logic does not involve creating Modin objects
# and manipulation with them in worker processes
return cls(data=from_pandas(pandas_df))

def __reduce__(self):
self._query_compiler.finalize()
if PersistentPickle.get():
return self._inflate_full, (self._to_pandas(),)
return self._inflate_light, (self._query_compiler,)
pid = os.getpid()
if (
PersistentPickle.get()
or not self._query_compiler.support_materialization_in_worker_process()
):
return self._inflate_full, (self._to_pandas(), pid)
return self._inflate_light, (self._query_compiler, pid)

# Persistance support methods - END
4 changes: 3 additions & 1 deletion modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2533,7 +2533,7 @@ def _inflate_full(cls, pandas_series, source_pid):
pandas_series : pandas.Series
Data to use for object re-creation.
source_pid : int
Determines whether a Modin or Pandas object needs to be created.
Determines whether a Modin or pandas object needs to be created.
Modin objects are created only on the main process.
Returns
Expand All @@ -2543,6 +2543,8 @@ def _inflate_full(cls, pandas_series, source_pid):
"""
if os.getpid() != source_pid:
return pandas_series

Check warning on line 2545 in modin/pandas/series.py

View check run for this annotation

Codecov / codecov/patch

modin/pandas/series.py#L2545

Added line #L2545 was not covered by tests
# The current logic does not involve creating Modin objects
# and manipulation with them in worker processes
return cls(data=pandas_series)

def __reduce__(self):
Expand Down
29 changes: 28 additions & 1 deletion modin/pandas/test/dataframe/test_pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import modin.pandas as pd
from modin.config import PersistentPickle
from modin.pandas.test.utils import df_equals
from modin.pandas.test.utils import create_test_dfs, df_equals


@pytest.fixture
Expand Down Expand Up @@ -47,6 +47,33 @@ def test_dataframe_pickle(modin_df, persistent):
df_equals(modin_df, other)


def test__reduce__():
# `Series.__reduce__` will be called implicitly when lambda expressions are
# pre-processed for the distributed engine.
dataframe_data = ["Major League Baseball", "National Basketball Association"]
abbr_md, abbr_pd = create_test_dfs(dataframe_data, index=["MLB", "NBA"])
# breakpoint()

dataframe_data = {
"name": ["Mariners", "Lakers"] * 500,
"league_abbreviation": ["MLB", "NBA"] * 500,
}
teams_md, teams_pd = create_test_dfs(dataframe_data)

result_md = (
teams_md.set_index("name")
.league_abbreviation.apply(lambda abbr: abbr_md[0].loc[abbr])
.rename("league")
)

result_pd = (
teams_pd.set_index("name")
.league_abbreviation.apply(lambda abbr: abbr_pd[0].loc[abbr])
.rename("league")
)
df_equals(result_md, result_pd)


def test_column_pickle(modin_column, modin_df, persistent):
dmp = pickle.dumps(modin_column)
other = pickle.loads(dmp)
Expand Down

0 comments on commit 468c6d9

Please sign in to comment.