From 144a4bc3c7cbe0c91e3e129a3a4e122e5ea52953 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Sat, 14 Oct 2023 00:46:31 +0200 Subject: [PATCH 01/10] FEAT-#5221: add 'wait_computations' to trigger lazy computations and wait for them to complete Signed-off-by: Anatoly Myachev --- modin/conftest.py | 3 ++ .../dataframe/pandas/dataframe/dataframe.py | 4 ++ .../pandas/partitioning/partition_manager.py | 3 +- .../partitioning/partition_manager.py | 1 + .../partitioning/partition_manager.py | 1 + .../partitioning/partition_manager.py | 1 + .../storage_formats/base/query_compiler.py | 5 +++ .../storage_formats/pandas/query_compiler.py | 3 ++ .../storage_formats/hdk/query_compiler.py | 3 ++ modin/test/test_executions_api.py | 1 + modin/test/test_utils.py | 12 ++++++ modin/utils.py | 40 +++++++++++++++++++ 12 files changed, 76 insertions(+), 1 deletion(-) diff --git a/modin/conftest.py b/modin/conftest.py index 87cc0083792..eeeeb49f536 100644 --- a/modin/conftest.py +++ b/modin/conftest.py @@ -175,6 +175,9 @@ def __init__(self, modin_frame): def finalize(self): self._modin_frame.finalize() + def wait_computations(self): + self._modin_frame.wait_computations() + @classmethod def from_pandas(cls, df, data_cls): return cls(data_cls.from_pandas(df)) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 29eaaad3bea..320c35e224c 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -4106,6 +4106,10 @@ def finalize(self): """ self._partition_mgr_cls.finalize(self._partitions) + def wait_computations(self): + """Wait for all computations to complete without materializing data.""" + self._partition_mgr_cls.wait_partitions(self._partitions) + def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True): """ Get a Modin DataFrame that implements the dataframe exchange protocol. diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index fea0c686a96..8511068eccb 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -74,7 +74,7 @@ def wait(cls, *args, **kwargs): cls.finalize(partitions) # The partition manager invokes the relevant .wait() method under # the hood, which should wait in parallel for all computations to finish - cls.wait_partitions(partitions.flatten()) + cls.wait_partitions(partitions) return result return wait @@ -952,6 +952,7 @@ def wait_partitions(cls, partitions): This method should be implemented in a more efficient way for engines that supports waiting on objects in parallel. """ + partitions = partitions.flatten() for partition in partitions: partition.wait() diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py index f045c6ef392..c0e9d1d7a04 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py @@ -46,6 +46,7 @@ def wait_partitions(cls, partitions): partitions : np.ndarray NumPy array with ``PandasDataframePartition``-s. """ + partitions = partitions.flatten() DaskWrapper.wait( [block for partition in partitions for block in partition.list_of_blocks] ) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py index 074e60a30e2..60a72aa4020 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py @@ -53,6 +53,7 @@ def wait_partitions(cls, partitions): partitions : np.ndarray NumPy array with ``PandasDataframePartition``-s. """ + partitions = partitions.flatten() RayWrapper.wait( [block for partition in partitions for block in partition.list_of_blocks] ) diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py index 181de9f3f72..23576964d86 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py @@ -47,6 +47,7 @@ def wait_partitions(cls, partitions): partitions : np.ndarray NumPy array with ``PandasDataframePartition``-s. """ + partitions = partitions.flatten() UnidistWrapper.wait( [block for partition in partitions for block in partition.list_of_blocks] ) diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index ec31f98bd25..46656274dc9 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -317,6 +317,11 @@ def finalize(self): """Finalize constructing the dataframe calling all deferred functions which were used to build it.""" pass + @abc.abstractmethod + def wait_computations(self): + """Wait for all computations to complete without materializing data.""" + pass + # END Data Management Methods # To/From Pandas diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 0fc992bb05b..8e9127314a4 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -278,6 +278,9 @@ def lazy_execution(self): def finalize(self): self._modin_frame.finalize() + def wait_computations(self): + self._modin_frame.wait_computations() + def to_pandas(self): return self._modin_frame.to_pandas() diff --git a/modin/experimental/core/storage_formats/hdk/query_compiler.py b/modin/experimental/core/storage_formats/hdk/query_compiler.py index 439af6246d5..8f4b9acaf4d 100644 --- a/modin/experimental/core/storage_formats/hdk/query_compiler.py +++ b/modin/experimental/core/storage_formats/hdk/query_compiler.py @@ -183,6 +183,9 @@ def finalize(self): # TODO: implement this for HDK storage format raise NotImplementedError() + def wait_computations(self): + raise NotImplementedError() + def to_pandas(self): return self._modin_frame.to_pandas() diff --git a/modin/test/test_executions_api.py b/modin/test/test_executions_api.py index b60afd1ad59..7d7c21f120e 100644 --- a/modin/test/test_executions_api.py +++ b/modin/test/test_executions_api.py @@ -25,6 +25,7 @@ def test_base_abstract_methods(): "__init__", "free", "finalize", + "wait_computations", "to_pandas", "from_pandas", "from_arrow", diff --git a/modin/test/test_utils.py b/modin/test/test_utils.py index 3337385d139..f5e617a9586 100644 --- a/modin/test/test_utils.py +++ b/modin/test/test_utils.py @@ -13,7 +13,9 @@ import json from textwrap import dedent, indent +from unittest.mock import Mock, patch +import numpy as np import pandas import pytest @@ -336,3 +338,13 @@ def test_assert_dtypes_equal(): df1 = df1.astype({"a": "str"}) with pytest.raises(AssertionError): assert_dtypes_equal(df1, df2) + + +def test_wait_computations(): + df = pd.DataFrame(np.random.rand(100, 64)) + partitions = df._query_compiler._modin_frame._partitions + mgr_cls = df._query_compiler._modin_frame._partition_mgr_cls + with patch.object(mgr_cls, "wait_partitions", new=Mock()): + modin.utils.wait_computations(df) + mgr_cls.wait_partitions.assert_called_once() + assert (mgr_cls.wait_partitions.call_args[0] == partitions).all() diff --git a/modin/utils.py b/modin/utils.py index 6aba77f530f..df61dfeae3c 100644 --- a/modin/utils.py +++ b/modin/utils.py @@ -606,6 +606,46 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any: return obj +def trigger_import(obj: Any) -> None: + """ + Trigger import execution for DataFrames obtained by HDK engine. + + Parameters + ---------- + obj : Any + An object to trigger deferred data import. + """ + if hasattr(obj, "_query_compiler"): + modin_frame = obj._query_compiler._modin_frame + if hasattr(modin_frame, "force_import"): + modin_frame.force_import() + + +def wait_computations(obj: Any, *, trigger_hdk_import: bool = False) -> None: + """ + Trigger the `obj` lazy computations, if any, and wait for them to complete. + + Parameters + ---------- + obj : Any + An object to trigger lazy computations. + trigger_hdk_import : bool, default: False + Trigger import execution. Makes sense only for HDK storage format. + """ + if not hasattr(obj, "_query_compiler"): + return + + if Engine.get() == "Native": + if trigger_hdk_import: + trigger_import(obj) + else: + obj._query_compiler._modin_frame._execute() + return + + obj._query_compiler.finalize() + obj._query_compiler.wait_computations() + + def wrap_into_list(*args: Any, skipna: bool = True) -> List[Any]: """ Wrap a sequence of passed values in a flattened list. From 30151f66afc603e7097ecb66a089f1f416838474 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Thu, 19 Oct 2023 21:37:58 +0200 Subject: [PATCH 02/10] rename to 'execute' Signed-off-by: Anatoly Myachev --- modin/test/test_utils.py | 4 ++-- modin/utils.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modin/test/test_utils.py b/modin/test/test_utils.py index f5e617a9586..c196f87b4e6 100644 --- a/modin/test/test_utils.py +++ b/modin/test/test_utils.py @@ -340,11 +340,11 @@ def test_assert_dtypes_equal(): assert_dtypes_equal(df1, df2) -def test_wait_computations(): +def test_execute(): df = pd.DataFrame(np.random.rand(100, 64)) partitions = df._query_compiler._modin_frame._partitions mgr_cls = df._query_compiler._modin_frame._partition_mgr_cls with patch.object(mgr_cls, "wait_partitions", new=Mock()): - modin.utils.wait_computations(df) + modin.utils.execute(df) mgr_cls.wait_partitions.assert_called_once() assert (mgr_cls.wait_partitions.call_args[0] == partitions).all() diff --git a/modin/utils.py b/modin/utils.py index df61dfeae3c..aa2e595d9bb 100644 --- a/modin/utils.py +++ b/modin/utils.py @@ -621,7 +621,7 @@ def trigger_import(obj: Any) -> None: modin_frame.force_import() -def wait_computations(obj: Any, *, trigger_hdk_import: bool = False) -> None: +def execute(obj: Any, *, trigger_hdk_import: bool = False) -> None: """ Trigger the `obj` lazy computations, if any, and wait for them to complete. From 8ef14530d9aefe9ee4bd5f96a25c03b90639906f Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Thu, 19 Oct 2023 22:56:13 +0200 Subject: [PATCH 03/10] trigger import always if possible; cleanup Signed-off-by: Anatoly Myachev --- modin/conftest.py | 3 ++- .../dataframe/pandas/dataframe/dataframe.py | 3 ++- .../storage_formats/base/query_compiler.py | 2 +- .../storage_formats/pandas/query_compiler.py | 3 ++- .../storage_formats/hdk/query_compiler.py | 8 ++++-- modin/test/test_executions_api.py | 2 +- modin/utils.py | 26 +------------------ 7 files changed, 15 insertions(+), 32 deletions(-) diff --git a/modin/conftest.py b/modin/conftest.py index eeeeb49f536..d860807ec19 100644 --- a/modin/conftest.py +++ b/modin/conftest.py @@ -175,7 +175,8 @@ def __init__(self, modin_frame): def finalize(self): self._modin_frame.finalize() - def wait_computations(self): + def execute(self): + self.finalize() self._modin_frame.wait_computations() @classmethod diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 320c35e224c..96c811b3f0c 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -4097,9 +4097,10 @@ def transpose(self): dtypes=new_dtypes, ) + @lazy_metadata_decorator(apply_axis="both") def finalize(self): """ - Perform all deferred calls on partitions. + Perform all deferred calls on dataframe and partitions. This makes `self` Modin Dataframe independent of a history of queries that were used to build it. diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index 46656274dc9..3bf0e06f0df 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -318,7 +318,7 @@ def finalize(self): pass @abc.abstractmethod - def wait_computations(self): + def execute(self): """Wait for all computations to complete without materializing data.""" pass diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 8e9127314a4..eb7c2e808af 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -278,7 +278,8 @@ def lazy_execution(self): def finalize(self): self._modin_frame.finalize() - def wait_computations(self): + def execute(self): + self.finalize() self._modin_frame.wait_computations() def to_pandas(self): diff --git a/modin/experimental/core/storage_formats/hdk/query_compiler.py b/modin/experimental/core/storage_formats/hdk/query_compiler.py index 8f4b9acaf4d..a580017dc72 100644 --- a/modin/experimental/core/storage_formats/hdk/query_compiler.py +++ b/modin/experimental/core/storage_formats/hdk/query_compiler.py @@ -183,8 +183,12 @@ def finalize(self): # TODO: implement this for HDK storage format raise NotImplementedError() - def wait_computations(self): - raise NotImplementedError() + def execute(self): + try: + self._modin_frame.force_import() + except NotImplementedError: + # at least execute + self._modin_frame._execute() def to_pandas(self): return self._modin_frame.to_pandas() diff --git a/modin/test/test_executions_api.py b/modin/test/test_executions_api.py index 7d7c21f120e..35013132eea 100644 --- a/modin/test/test_executions_api.py +++ b/modin/test/test_executions_api.py @@ -25,7 +25,7 @@ def test_base_abstract_methods(): "__init__", "free", "finalize", - "wait_computations", + "execute", "to_pandas", "from_pandas", "from_arrow", diff --git a/modin/utils.py b/modin/utils.py index aa2e595d9bb..c6a473a1307 100644 --- a/modin/utils.py +++ b/modin/utils.py @@ -606,21 +606,6 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any: return obj -def trigger_import(obj: Any) -> None: - """ - Trigger import execution for DataFrames obtained by HDK engine. - - Parameters - ---------- - obj : Any - An object to trigger deferred data import. - """ - if hasattr(obj, "_query_compiler"): - modin_frame = obj._query_compiler._modin_frame - if hasattr(modin_frame, "force_import"): - modin_frame.force_import() - - def execute(obj: Any, *, trigger_hdk_import: bool = False) -> None: """ Trigger the `obj` lazy computations, if any, and wait for them to complete. @@ -634,16 +619,7 @@ def execute(obj: Any, *, trigger_hdk_import: bool = False) -> None: """ if not hasattr(obj, "_query_compiler"): return - - if Engine.get() == "Native": - if trigger_hdk_import: - trigger_import(obj) - else: - obj._query_compiler._modin_frame._execute() - return - - obj._query_compiler.finalize() - obj._query_compiler.wait_computations() + obj._query_compiler.execute() def wrap_into_list(*args: Any, skipna: bool = True) -> List[Any]: From 76aec6507f2515f72bf3e78ee6dc57b0e4219ba3 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 20 Oct 2023 14:26:27 +0200 Subject: [PATCH 04/10] fix Signed-off-by: Anatoly Myachev --- modin/utils.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/modin/utils.py b/modin/utils.py index c6a473a1307..73d6b1c9c03 100644 --- a/modin/utils.py +++ b/modin/utils.py @@ -606,7 +606,7 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any: return obj -def execute(obj: Any, *, trigger_hdk_import: bool = False) -> None: +def execute(obj: Any) -> None: """ Trigger the `obj` lazy computations, if any, and wait for them to complete. @@ -614,8 +614,6 @@ def execute(obj: Any, *, trigger_hdk_import: bool = False) -> None: ---------- obj : Any An object to trigger lazy computations. - trigger_hdk_import : bool, default: False - Trigger import execution. Makes sense only for HDK storage format. """ if not hasattr(obj, "_query_compiler"): return From ad2facde656624953baaba501bd4f320f428db1b Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 20 Oct 2023 15:47:22 +0200 Subject: [PATCH 05/10] allow to pass iterable object Signed-off-by: Anatoly Myachev --- modin/utils.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/modin/utils.py b/modin/utils.py index 73d6b1c9c03..4d59b68ca1b 100644 --- a/modin/utils.py +++ b/modin/utils.py @@ -27,6 +27,7 @@ from typing import ( Any, Callable, + Iterable, List, Mapping, Optional, @@ -606,18 +607,18 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any: return obj -def execute(obj: Any) -> None: +def execute(objs: Iterable[Any]) -> None: """ - Trigger the `obj` lazy computations, if any, and wait for them to complete. + Trigger the lazy computations for each obj in `objs`, if any, and wait for them to complete. Parameters ---------- - obj : Any - An object to trigger lazy computations. + objs : Iterable[Any] + A collection of objects to trigger lazy computations. """ - if not hasattr(obj, "_query_compiler"): - return - obj._query_compiler.execute() + for obj in objs: + if hasattr(obj, "_query_compiler"): + obj._query_compiler.execute() def wrap_into_list(*args: Any, skipna: bool = True) -> List[Any]: From 0f1d3418916470cee4934eef5bb042871bf085c0 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 20 Oct 2023 17:51:48 +0200 Subject: [PATCH 06/10] fixes Signed-off-by: Anatoly Myachev --- .../core/storage_formats/hdk/query_compiler.py | 10 +++++----- modin/utils.py | 15 +++++++++++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/modin/experimental/core/storage_formats/hdk/query_compiler.py b/modin/experimental/core/storage_formats/hdk/query_compiler.py index a580017dc72..79d5a6d8db8 100644 --- a/modin/experimental/core/storage_formats/hdk/query_compiler.py +++ b/modin/experimental/core/storage_formats/hdk/query_compiler.py @@ -184,11 +184,11 @@ def finalize(self): raise NotImplementedError() def execute(self): - try: - self._modin_frame.force_import() - except NotImplementedError: - # at least execute - self._modin_frame._execute() + self._modin_frame._execute() + + def force_import(self): + # HDK-specific method + self._modin_frame.force_import() def to_pandas(self): return self._modin_frame.to_pandas() diff --git a/modin/utils.py b/modin/utils.py index 4d59b68ca1b..b252372b15f 100644 --- a/modin/utils.py +++ b/modin/utils.py @@ -607,18 +607,25 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any: return obj -def execute(objs: Iterable[Any]) -> None: +def execute(*objs: Iterable[Any], trigger_hdk_import=False) -> None: """ Trigger the lazy computations for each obj in `objs`, if any, and wait for them to complete. Parameters ---------- - objs : Iterable[Any] + *objs : Iterable[Any] A collection of objects to trigger lazy computations. + trigger_hdk_import : bool, default: False + Trigger import execution. Makes sense only for HDK storage format. + Safe to use with other storage formats. """ for obj in objs: - if hasattr(obj, "_query_compiler"): - obj._query_compiler.execute() + if not hasattr(obj, "_query_compiler"): + continue + query_compiler = obj._query_compiler + query_compiler.execute() + if trigger_hdk_import and hasattr(query_compiler, "force_import"): + query_compiler.force_import() def wrap_into_list(*args: Any, skipna: bool = True) -> List[Any]: From d235435fa5d6e7925d04560bd6db239a973cdd0d Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 20 Oct 2023 17:57:50 +0200 Subject: [PATCH 07/10] cleanup Signed-off-by: Anatoly Myachev --- modin/core/dataframe/pandas/dataframe/dataframe.py | 5 ++--- .../core/dataframe/pandas/partitioning/partition_manager.py | 3 +-- .../pandas_on_dask/partitioning/partition_manager.py | 1 - .../pandas_on_ray/partitioning/partition_manager.py | 1 - .../pandas_on_unidist/partitioning/partition_manager.py | 1 - .../experimental/core/storage_formats/hdk/query_compiler.py | 1 + modin/test/test_utils.py | 2 +- modin/utils.py | 2 +- 8 files changed, 6 insertions(+), 10 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 96c811b3f0c..527df08beb4 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -4097,10 +4097,9 @@ def transpose(self): dtypes=new_dtypes, ) - @lazy_metadata_decorator(apply_axis="both") def finalize(self): """ - Perform all deferred calls on dataframe and partitions. + Perform all deferred calls on partitions. This makes `self` Modin Dataframe independent of a history of queries that were used to build it. @@ -4109,7 +4108,7 @@ def finalize(self): def wait_computations(self): """Wait for all computations to complete without materializing data.""" - self._partition_mgr_cls.wait_partitions(self._partitions) + self._partition_mgr_cls.wait_partitions(self._partitions.flatten()) def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True): """ diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 8511068eccb..fea0c686a96 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -74,7 +74,7 @@ def wait(cls, *args, **kwargs): cls.finalize(partitions) # The partition manager invokes the relevant .wait() method under # the hood, which should wait in parallel for all computations to finish - cls.wait_partitions(partitions) + cls.wait_partitions(partitions.flatten()) return result return wait @@ -952,7 +952,6 @@ def wait_partitions(cls, partitions): This method should be implemented in a more efficient way for engines that supports waiting on objects in parallel. """ - partitions = partitions.flatten() for partition in partitions: partition.wait() diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py index c0e9d1d7a04..f045c6ef392 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py @@ -46,7 +46,6 @@ def wait_partitions(cls, partitions): partitions : np.ndarray NumPy array with ``PandasDataframePartition``-s. """ - partitions = partitions.flatten() DaskWrapper.wait( [block for partition in partitions for block in partition.list_of_blocks] ) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py index 60a72aa4020..074e60a30e2 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py @@ -53,7 +53,6 @@ def wait_partitions(cls, partitions): partitions : np.ndarray NumPy array with ``PandasDataframePartition``-s. """ - partitions = partitions.flatten() RayWrapper.wait( [block for partition in partitions for block in partition.list_of_blocks] ) diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py index 23576964d86..181de9f3f72 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py @@ -47,7 +47,6 @@ def wait_partitions(cls, partitions): partitions : np.ndarray NumPy array with ``PandasDataframePartition``-s. """ - partitions = partitions.flatten() UnidistWrapper.wait( [block for partition in partitions for block in partition.list_of_blocks] ) diff --git a/modin/experimental/core/storage_formats/hdk/query_compiler.py b/modin/experimental/core/storage_formats/hdk/query_compiler.py index 79d5a6d8db8..b4a580478cd 100644 --- a/modin/experimental/core/storage_formats/hdk/query_compiler.py +++ b/modin/experimental/core/storage_formats/hdk/query_compiler.py @@ -187,6 +187,7 @@ def execute(self): self._modin_frame._execute() def force_import(self): + """Force table import.""" # HDK-specific method self._modin_frame.force_import() diff --git a/modin/test/test_utils.py b/modin/test/test_utils.py index c196f87b4e6..8f024cd015f 100644 --- a/modin/test/test_utils.py +++ b/modin/test/test_utils.py @@ -342,7 +342,7 @@ def test_assert_dtypes_equal(): def test_execute(): df = pd.DataFrame(np.random.rand(100, 64)) - partitions = df._query_compiler._modin_frame._partitions + partitions = df._query_compiler._modin_frame._partitions.flatten() mgr_cls = df._query_compiler._modin_frame._partition_mgr_cls with patch.object(mgr_cls, "wait_partitions", new=Mock()): modin.utils.execute(df) diff --git a/modin/utils.py b/modin/utils.py index b252372b15f..c42a0516fa3 100644 --- a/modin/utils.py +++ b/modin/utils.py @@ -607,7 +607,7 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any: return obj -def execute(*objs: Iterable[Any], trigger_hdk_import=False) -> None: +def execute(*objs: Iterable[Any], trigger_hdk_import: bool = False) -> None: """ Trigger the lazy computations for each obj in `objs`, if any, and wait for them to complete. From 49c30f4aa80d0db9ec1e249346ec5ceed749d9b1 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 20 Oct 2023 18:46:43 +0200 Subject: [PATCH 08/10] update test Signed-off-by: Anatoly Myachev --- modin/test/test_utils.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/modin/test/test_utils.py b/modin/test/test_utils.py index 8f024cd015f..8f1b68a60f7 100644 --- a/modin/test/test_utils.py +++ b/modin/test/test_utils.py @@ -341,10 +341,30 @@ def test_assert_dtypes_equal(): def test_execute(): - df = pd.DataFrame(np.random.rand(100, 64)) - partitions = df._query_compiler._modin_frame._partitions.flatten() - mgr_cls = df._query_compiler._modin_frame._partition_mgr_cls + data = np.random.rand(100, 64) + modin_df, pandas_df = create_test_dfs(data) + partitions = modin_df._query_compiler._modin_frame._partitions.flatten() + mgr_cls = modin_df._query_compiler._modin_frame._partition_mgr_cls + + # check modin case with patch.object(mgr_cls, "wait_partitions", new=Mock()): - modin.utils.execute(df) + modin.utils.execute(modin_df) mgr_cls.wait_partitions.assert_called_once() assert (mgr_cls.wait_partitions.call_args[0] == partitions).all() + + # check pandas case without error + with patch.object(mgr_cls, "wait_partitions", new=Mock()): + modin.utils.execute(pandas_df) + mgr_cls.wait_partitions.assert_not_called() + + # muke sure `trigger_hdk_import=True` doesn't broke anything + # when using other storage formats + with patch.object(mgr_cls, "wait_partitions", new=Mock()): + modin.utils.execute(modin_df, trigger_hdk_import=True) + mgr_cls.wait_partitions.assert_called_once() + + # check several modin dataframes + with patch.object(mgr_cls, "wait_partitions", new=Mock()): + modin.utils.execute(modin_df, modin_df[modin_df.columns[:4]]) + mgr_cls.wait_partitions.assert_called + assert mgr_cls.wait_partitions.call_count == 2 From 404c2ba2622d524ab5c76ec8e27e0fd2d0d373c1 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Mon, 23 Oct 2023 15:15:20 +0200 Subject: [PATCH 09/10] add docs Signed-off-by: Anatoly Myachev --- docs/development/architecture.rst | 1 + docs/flow/modin/utils.rst | 12 ++++++++++++ 2 files changed, 13 insertions(+) create mode 100644 docs/flow/modin/utils.rst diff --git a/docs/development/architecture.rst b/docs/development/architecture.rst index 130813a86f5..22fc7f91496 100644 --- a/docs/development/architecture.rst +++ b/docs/development/architecture.rst @@ -304,6 +304,7 @@ details. The documentation covers most modules, with more docs being added every ├───examples ├───modin │ ├─── :doc:`config ` + | ├─── :doc:`utils ` │ ├───core │ │ ├─── :doc:`dataframe ` │ │ │ ├─── :doc:`algebra ` diff --git a/docs/flow/modin/utils.rst b/docs/flow/modin/utils.rst new file mode 100644 index 00000000000..70c622d1d7a --- /dev/null +++ b/docs/flow/modin/utils.rst @@ -0,0 +1,12 @@ +:orphan: + +Modin Utils +""""""""""" + +Here are utilities that can be useful when working with Modin. + +Public API +'''''''''' + +.. autofunction:: modin.utils.try_cast_to_pandas +.. autofunction:: modin.utils.execute From 55d9a86dcd2546e6364083856aa3af694228915d Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Mon, 23 Oct 2023 18:51:07 +0200 Subject: [PATCH 10/10] suggestion to use 'execute' instead of 'repr' Signed-off-by: Anatoly Myachev --- docs/usage_guide/benchmarking.rst | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/docs/usage_guide/benchmarking.rst b/docs/usage_guide/benchmarking.rst index 7340a31db97..551c9950ae7 100644 --- a/docs/usage_guide/benchmarking.rst +++ b/docs/usage_guide/benchmarking.rst @@ -8,8 +8,8 @@ To benchmark a single Modin function, often turning on the :code:`BenchmarkMode` will suffice. There is no simple way to benchmark more complex Modin workflows, though -benchmark mode or calling ``repr`` on Modin objects may be useful. The -:doc:`Modin logs ` may help you +benchmark mode or calling ``modin.utils.execute`` on Modin objects may be useful. +The :doc:`Modin logs ` may help you identify bottlenecks in your code, and they may also help profile the execution of each Modin function. @@ -125,9 +125,8 @@ at each Modin :doc:`layer `. Log mode is more useful when used in conjuction with benchmark mode. Sometimes, if you don't have a natural end-point to your workflow, you can -just call ``repr`` on the workflow's final Modin objects. That will typically -block on any asynchronous computation. However, beware that ``repr`` can also -be misleading, e.g. here: +just call ``modin.utils.execute`` on the workflow's final Modin objects. +That will typically block on any asynchronous computation: .. code-block:: python @@ -137,6 +136,7 @@ be misleading, e.g. here: import modin.pandas as pd from modin.config import MinPartitionSize, NPartitions + import modin.utils MinPartitionSize.put(32) NPartitions.put(16) @@ -149,17 +149,13 @@ be misleading, e.g. here: ray.init() df1 = pd.DataFrame(list(range(10_000)), columns=['A']) result = df1.map(slow_add_one) - %time repr(result) - # time.sleep(10) + # %time modin.utils.execute(result) %time result.to_parquet(BytesIO()) .. code-block::python -The ``repr`` takes only 802 milliseconds, but writing the result to a buffer -takes 9.84 seconds. However, if you uncomment the :code:`time.sleep` before the -:code:`to_parquet` call, the :code:`to_parquet` takes just 23.8 milliseconds! -The problem is that the ``repr`` blocks only on getting the first few and the -last few rows, but the slow execution is for row 5001, which Modin is -computing asynchronously in the background even after ``repr`` finishes. +Writing the result to a buffer takes 9.84 seconds. However, if you uncomment +the :code:`%time modin.utils.execute(result)` before the :code:`to_parquet` +call, the :code:`to_parquet` takes just 23.8 milliseconds! .. note:: If you see any Modin documentation touting Modin's speed without using