diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index fcdc75de27b..8dd791dc1a4 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -4119,6 +4119,16 @@ def wait_computations(self): """Wait for all computations to complete without materializing data.""" self._partition_mgr_cls.wait_partitions(self._partitions.flatten()) + def support_materialization_in_worker_process(self) -> bool: + """ + Whether it's possible to call function `to_pandas` during the pickling process, at the moment of recreating the object. + + Returns + ------- + bool + """ + return True + def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True): """ Get a Modin DataFrame that implements the dataframe exchange protocol. diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index fea0c686a96..2b780e79b8f 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -27,7 +27,13 @@ import pandas from pandas._libs.lib import no_default -from modin.config import BenchmarkMode, Engine, NPartitions, ProgressBar +from modin.config import ( + BenchmarkMode, + Engine, + NPartitions, + PersistentPickle, + ProgressBar, +) from modin.core.dataframe.pandas.utils import concatenate from modin.core.storage_formats.pandas.utils import compute_chunksize from modin.error_message import ErrorMessage @@ -121,7 +127,20 @@ def preprocess_func(cls, map_func): `map_func` if the `apply` method of the `PandasDataframePartition` object you are using does not require any modification to a given function. """ - return cls._partition_class.preprocess_func(map_func) + old_value = PersistentPickle.get() + # When performing a function with Modin objects, it is more profitable to + # do the conversion to pandas once on the main process than several times + # on worker processes. Details: https://github.com/modin-project/modin/pull/6673/files#r1391086755 + # For Dask, otherwise there may be an error: `coroutine 'Client._gather' was never awaited` + need_update = not PersistentPickle.get() and Engine.get() != "Dask" + if need_update: + PersistentPickle.put(True) + try: + result = cls._partition_class.preprocess_func(map_func) + finally: + if need_update: + PersistentPickle.put(old_value) + return result # END Abstract Methods diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py index 77c78e29f9c..0920d963840 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py @@ -41,3 +41,26 @@ class PandasOnDaskDataframe(PandasDataframe): """ _partition_mgr_cls = PandasOnDaskDataframePartitionManager + + @classmethod + def reconnect(cls, address, attributes): # noqa: GL08 + # The main goal is to configure the client for the worker process + # using the address passed by the custom `__reduce__` function + try: + from distributed import default_client + + default_client() + except ValueError: + from distributed import Client + + # setup `default_client` for worker process + _ = Client(address) + obj = cls.__new__(cls) + obj.__dict__.update(attributes) + return obj + + def __reduce__(self): # noqa: GL08 + from distributed import default_client + + address = default_client().scheduler_info()["address"] + return self.reconnect, (address, self.__dict__) diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py index 17e58d76b13..3241e9299e8 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py @@ -41,3 +41,7 @@ class PandasOnUnidistDataframe(PandasDataframe): """ _partition_mgr_cls = PandasOnUnidistDataframePartitionManager + + def support_materialization_in_worker_process(self) -> bool: + # more details why this is not `True` in https://github.com/modin-project/modin/pull/6673 + return False diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index ada3f3db3d3..4f240c6d479 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -325,6 +325,16 @@ def execute(self): """Wait for all computations to complete without materializing data.""" pass + def support_materialization_in_worker_process(self) -> bool: + """ + Whether it's possible to call function `to_pandas` during the pickling process, at the moment of recreating the object. + + Returns + ------- + bool + """ + return self._modin_frame.support_materialization_in_worker_process() + # END Data Management Methods # To/From Pandas diff --git a/modin/experimental/core/storage_formats/hdk/query_compiler.py b/modin/experimental/core/storage_formats/hdk/query_compiler.py index b4a580478cd..31bf0c7a460 100644 --- a/modin/experimental/core/storage_formats/hdk/query_compiler.py +++ b/modin/experimental/core/storage_formats/hdk/query_compiler.py @@ -191,6 +191,9 @@ def force_import(self): # HDK-specific method self._modin_frame.force_import() + def support_materialization_in_worker_process(self) -> bool: + return True + def to_pandas(self): return self._modin_frame.to_pandas() diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index e87c6b780c1..4251e4b8f55 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -18,6 +18,7 @@ import datetime import functools import itertools +import os import re import sys import warnings @@ -3107,7 +3108,7 @@ def _getitem(self, key): # Persistance support methods - BEGIN @classmethod - def _inflate_light(cls, query_compiler): + def _inflate_light(cls, query_compiler, source_pid): """ Re-creates the object from previously-serialized lightweight representation. @@ -3117,16 +3118,23 @@ def _inflate_light(cls, query_compiler): ---------- query_compiler : BaseQueryCompiler Query compiler to use for object re-creation. + source_pid : int + Determines whether a Modin or pandas object needs to be created. + Modin objects are created only on the main process. Returns ------- DataFrame New ``DataFrame`` based on the `query_compiler`. """ + if os.getpid() != source_pid: + return query_compiler.to_pandas() + # The current logic does not involve creating Modin objects + # and manipulation with them in worker processes return cls(query_compiler=query_compiler) @classmethod - def _inflate_full(cls, pandas_df): + def _inflate_full(cls, pandas_df, source_pid): """ Re-creates the object from previously-serialized disk-storable representation. @@ -3134,18 +3142,29 @@ def _inflate_full(cls, pandas_df): ---------- pandas_df : pandas.DataFrame Data to use for object re-creation. + source_pid : int + Determines whether a Modin or pandas object needs to be created. + Modin objects are created only on the main process. Returns ------- DataFrame New ``DataFrame`` based on the `pandas_df`. """ + if os.getpid() != source_pid: + return pandas_df + # The current logic does not involve creating Modin objects + # and manipulation with them in worker processes return cls(data=from_pandas(pandas_df)) def __reduce__(self): self._query_compiler.finalize() - if PersistentPickle.get(): - return self._inflate_full, (self._to_pandas(),) - return self._inflate_light, (self._query_compiler,) + pid = os.getpid() + if ( + PersistentPickle.get() + or not self._query_compiler.support_materialization_in_worker_process() + ): + return self._inflate_full, (self._to_pandas(), pid) + return self._inflate_light, (self._query_compiler, pid) # Persistance support methods - END diff --git a/modin/pandas/series.py b/modin/pandas/series.py index 18cb621cdae..849bd0ff656 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -15,6 +15,7 @@ from __future__ import annotations +import os import warnings from typing import IO, TYPE_CHECKING, Hashable, Optional, Union @@ -2497,7 +2498,7 @@ def _repartition(self): # Persistance support methods - BEGIN @classmethod - def _inflate_light(cls, query_compiler, name): + def _inflate_light(cls, query_compiler, name, source_pid): """ Re-creates the object from previously-serialized lightweight representation. @@ -2509,16 +2510,23 @@ def _inflate_light(cls, query_compiler, name): Query compiler to use for object re-creation. name : str The name to give to the new object. + source_pid : int + Determines whether a Modin or pandas object needs to be created. + Modin objects are created only on the main process. Returns ------- Series New Series based on the `query_compiler`. """ + if os.getpid() != source_pid: + return query_compiler.to_pandas() + # The current logic does not involve creating Modin objects + # and manipulation with them in worker processes return cls(query_compiler=query_compiler, name=name) @classmethod - def _inflate_full(cls, pandas_series): + def _inflate_full(cls, pandas_series, source_pid): """ Re-creates the object from previously-serialized disk-storable representation. @@ -2526,18 +2534,29 @@ def _inflate_full(cls, pandas_series): ---------- pandas_series : pandas.Series Data to use for object re-creation. + source_pid : int + Determines whether a Modin or pandas object needs to be created. + Modin objects are created only on the main process. Returns ------- Series New Series based on the `pandas_series`. """ + if os.getpid() != source_pid: + return pandas_series + # The current logic does not involve creating Modin objects + # and manipulation with them in worker processes return cls(data=pandas_series) def __reduce__(self): self._query_compiler.finalize() - if PersistentPickle.get(): - return self._inflate_full, (self._to_pandas(),) - return self._inflate_light, (self._query_compiler, self.name) + pid = os.getpid() + if ( + PersistentPickle.get() + or not self._query_compiler.support_materialization_in_worker_process() + ): + return self._inflate_full, (self._to_pandas(), pid) + return self._inflate_light, (self._query_compiler, self.name, pid) # Persistance support methods - END diff --git a/modin/pandas/test/dataframe/test_pickle.py b/modin/pandas/test/dataframe/test_pickle.py index a3ee843daa3..aed6b710b4b 100644 --- a/modin/pandas/test/dataframe/test_pickle.py +++ b/modin/pandas/test/dataframe/test_pickle.py @@ -18,7 +18,7 @@ import modin.pandas as pd from modin.config import PersistentPickle -from modin.pandas.test.utils import df_equals +from modin.pandas.test.utils import create_test_dfs, df_equals @pytest.fixture @@ -47,6 +47,33 @@ def test_dataframe_pickle(modin_df, persistent): df_equals(modin_df, other) +def test__reduce__(): + # `DataFrame.__reduce__` will be called implicitly when lambda expressions are + # pre-processed for the distributed engine. + dataframe_data = ["Major League Baseball", "National Basketball Association"] + abbr_md, abbr_pd = create_test_dfs(dataframe_data, index=["MLB", "NBA"]) + # breakpoint() + + dataframe_data = { + "name": ["Mariners", "Lakers"] * 500, + "league_abbreviation": ["MLB", "NBA"] * 500, + } + teams_md, teams_pd = create_test_dfs(dataframe_data) + + result_md = ( + teams_md.set_index("name") + .league_abbreviation.apply(lambda abbr: abbr_md[0].loc[abbr]) + .rename("league") + ) + + result_pd = ( + teams_pd.set_index("name") + .league_abbreviation.apply(lambda abbr: abbr_pd[0].loc[abbr]) + .rename("league") + ) + df_equals(result_md, result_pd) + + def test_column_pickle(modin_column, modin_df, persistent): dmp = pickle.dumps(modin_column) other = pickle.loads(dmp) diff --git a/modin/pandas/test/test_series.py b/modin/pandas/test/test_series.py index f193a8efb05..1b2c5eb0a5a 100644 --- a/modin/pandas/test/test_series.py +++ b/modin/pandas/test/test_series.py @@ -47,6 +47,7 @@ bool_arg_keys, bool_arg_values, categories_equals, + create_test_dfs, default_to_pandas_ignore_string, df_equals, df_equals_with_non_stable_indices, @@ -4794,3 +4795,29 @@ def test_binary_numpy_universal_function_issue_6483(): *create_test_series(test_data["float_nan_data"]), lambda series: np.arctan2(series, np.sin(series)), ) + + +def test__reduce__(): + # `Series.__reduce__` will be called implicitly when lambda expressions are + # pre-processed for the distributed engine. + series_data = ["Major League Baseball", "National Basketball Association"] + abbr_md, abbr_pd = create_test_series(series_data, index=["MLB", "NBA"]) + + dataframe_data = { + "name": ["Mariners", "Lakers"] * 500, + "league_abbreviation": ["MLB", "NBA"] * 500, + } + teams_md, teams_pd = create_test_dfs(dataframe_data) + + result_md = ( + teams_md.set_index("name") + .league_abbreviation.apply(lambda abbr: abbr_md.loc[abbr]) + .rename("league") + ) + + result_pd = ( + teams_pd.set_index("name") + .league_abbreviation.apply(lambda abbr: abbr_pd.loc[abbr]) + .rename("league") + ) + df_equals(result_md, result_pd)