From 6f4088a503d8437a6aeff8d640e07a2344aad480 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Sat, 14 Oct 2023 01:24:06 +0200 Subject: [PATCH] add new methods for internals Signed-off-by: Anatoly Myachev --- .../dataframe/pandas/dataframe/dataframe.py | 4 +++ .../pandas/partitioning/partition_manager.py | 3 +- .../partitioning/partition_manager.py | 1 + .../partitioning/partition_manager.py | 1 + .../partitioning/partition_manager.py | 1 + .../storage_formats/base/query_compiler.py | 5 ++++ .../storage_formats/pandas/query_compiler.py | 3 ++ .../storage_formats/hdk/query_compiler.py | 3 ++ modin/test/test_executions_api.py | 1 + modin/utils.py | 29 +++++++++++-------- 10 files changed, 38 insertions(+), 13 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 29eaaad3bea..320c35e224c 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -4106,6 +4106,10 @@ def finalize(self): """ self._partition_mgr_cls.finalize(self._partitions) + def wait_computations(self): + """Wait for all computations to complete without materializing data.""" + self._partition_mgr_cls.wait_partitions(self._partitions) + def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True): """ Get a Modin DataFrame that implements the dataframe exchange protocol. diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 904a2809b4f..40ce8715a91 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -74,7 +74,7 @@ def wait(cls, *args, **kwargs): cls.finalize(partitions) # The partition manager invokes the relevant .wait() method under # the hood, which should wait in parallel for all computations to finish - cls.wait_partitions(partitions.flatten()) + cls.wait_partitions(partitions) return result return wait @@ -930,6 +930,7 @@ def wait_partitions(cls, partitions): This method should be implemented in a more efficient way for engines that supports waiting on objects in parallel. """ + partitions = partitions.flatten() for partition in partitions: partition.wait() diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py index f045c6ef392..c0e9d1d7a04 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py @@ -46,6 +46,7 @@ def wait_partitions(cls, partitions): partitions : np.ndarray NumPy array with ``PandasDataframePartition``-s. """ + partitions = partitions.flatten() DaskWrapper.wait( [block for partition in partitions for block in partition.list_of_blocks] ) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py index 26164984fa7..fbd947bcb5e 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py @@ -47,6 +47,7 @@ def wait_partitions(cls, partitions): partitions : np.ndarray NumPy array with ``PandasDataframePartition``-s. """ + partitions = partitions.flatten() RayWrapper.wait( [block for partition in partitions for block in partition.list_of_blocks] ) diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py index 181de9f3f72..23576964d86 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py @@ -47,6 +47,7 @@ def wait_partitions(cls, partitions): partitions : np.ndarray NumPy array with ``PandasDataframePartition``-s. """ + partitions = partitions.flatten() UnidistWrapper.wait( [block for partition in partitions for block in partition.list_of_blocks] ) diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index ec31f98bd25..46656274dc9 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -317,6 +317,11 @@ def finalize(self): """Finalize constructing the dataframe calling all deferred functions which were used to build it.""" pass + @abc.abstractmethod + def wait_computations(self): + """Wait for all computations to complete without materializing data.""" + pass + # END Data Management Methods # To/From Pandas diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index aef305d6c17..d8a5f2fc1a7 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -278,6 +278,9 @@ def lazy_execution(self): def finalize(self): self._modin_frame.finalize() + def wait_computations(self): + self._modin_frame.wait_computations() + def to_pandas(self): return self._modin_frame.to_pandas() diff --git a/modin/experimental/core/storage_formats/hdk/query_compiler.py b/modin/experimental/core/storage_formats/hdk/query_compiler.py index 439af6246d5..8f4b9acaf4d 100644 --- a/modin/experimental/core/storage_formats/hdk/query_compiler.py +++ b/modin/experimental/core/storage_formats/hdk/query_compiler.py @@ -183,6 +183,9 @@ def finalize(self): # TODO: implement this for HDK storage format raise NotImplementedError() + def wait_computations(self): + raise NotImplementedError() + def to_pandas(self): return self._modin_frame.to_pandas() diff --git a/modin/test/test_executions_api.py b/modin/test/test_executions_api.py index b60afd1ad59..7d7c21f120e 100644 --- a/modin/test/test_executions_api.py +++ b/modin/test/test_executions_api.py @@ -25,6 +25,7 @@ def test_base_abstract_methods(): "__init__", "free", "finalize", + "wait_computations", "to_pandas", "from_pandas", "from_arrow", diff --git a/modin/utils.py b/modin/utils.py index 0bf8cb14f3f..df61dfeae3c 100644 --- a/modin/utils.py +++ b/modin/utils.py @@ -606,23 +606,32 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any: return obj -def trigger_import(df: Any): +def trigger_import(obj: Any) -> None: """ Trigger import execution for DataFrames obtained by HDK engine. Parameters ---------- - df : Any - DataFrames to trigger import. + obj : Any + An object to trigger deferred data import. """ - if hasattr(df, "_query_compiler"): - modin_frame = df._query_compiler._modin_frame + if hasattr(obj, "_query_compiler"): + modin_frame = obj._query_compiler._modin_frame if hasattr(modin_frame, "force_import"): modin_frame.force_import() -def wait_computations(obj: Any, *, trigger_hdk_import: bool = False): - """Make sure obj's computations finished.""" +def wait_computations(obj: Any, *, trigger_hdk_import: bool = False) -> None: + """ + Trigger the `obj` lazy computations, if any, and wait for them to complete. + + Parameters + ---------- + obj : Any + An object to trigger lazy computations. + trigger_hdk_import : bool, default: False + Trigger import execution. Makes sense only for HDK storage format. + """ if not hasattr(obj, "_query_compiler"): return @@ -634,11 +643,7 @@ def wait_computations(obj: Any, *, trigger_hdk_import: bool = False): return obj._query_compiler.finalize() - - partitions = obj._query_compiler._modin_frame._partitions.flatten() - mgr_cls = obj._query_compiler._modin_frame._partition_mgr_cls - if len(partitions) and hasattr(mgr_cls, "wait_partitions"): - mgr_cls.wait_partitions(partitions) + obj._query_compiler.wait_computations() def wrap_into_list(*args: Any, skipna: bool = True) -> List[Any]: