Skip to content

Commit

Permalink
add new methods for internals
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed Oct 13, 2023
1 parent f5c6121 commit 6f4088a
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 13 deletions.
4 changes: 4 additions & 0 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4106,6 +4106,10 @@ def finalize(self):
"""
self._partition_mgr_cls.finalize(self._partitions)

def wait_computations(self):
"""Wait for all computations to complete without materializing data."""
self._partition_mgr_cls.wait_partitions(self._partitions)

Check warning on line 4111 in modin/core/dataframe/pandas/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/dataframe/dataframe.py#L4111

Added line #L4111 was not covered by tests

def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True):
"""
Get a Modin DataFrame that implements the dataframe exchange protocol.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def wait(cls, *args, **kwargs):
cls.finalize(partitions)
# The partition manager invokes the relevant .wait() method under
# the hood, which should wait in parallel for all computations to finish
cls.wait_partitions(partitions.flatten())
cls.wait_partitions(partitions)

Check warning on line 77 in modin/core/dataframe/pandas/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/partition_manager.py#L77

Added line #L77 was not covered by tests
return result

return wait
Expand Down Expand Up @@ -930,6 +930,7 @@ def wait_partitions(cls, partitions):
This method should be implemented in a more efficient way for engines that supports
waiting on objects in parallel.
"""
partitions = partitions.flatten()

Check warning on line 933 in modin/core/dataframe/pandas/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/partition_manager.py#L933

Added line #L933 was not covered by tests
for partition in partitions:
partition.wait()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def wait_partitions(cls, partitions):
partitions : np.ndarray
NumPy array with ``PandasDataframePartition``-s.
"""
partitions = partitions.flatten()
DaskWrapper.wait(

Check warning on line 50 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py#L49-L50

Added lines #L49 - L50 were not covered by tests
[block for partition in partitions for block in partition.list_of_blocks]
)
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def wait_partitions(cls, partitions):
partitions : np.ndarray
NumPy array with ``PandasDataframePartition``-s.
"""
partitions = partitions.flatten()
RayWrapper.wait(
[block for partition in partitions for block in partition.list_of_blocks]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def wait_partitions(cls, partitions):
partitions : np.ndarray
NumPy array with ``PandasDataframePartition``-s.
"""
partitions = partitions.flatten()

Check warning on line 50 in modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/partition_manager.py#L50

Added line #L50 was not covered by tests
UnidistWrapper.wait(
[block for partition in partitions for block in partition.list_of_blocks]
)
Expand Down
5 changes: 5 additions & 0 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ def finalize(self):
"""Finalize constructing the dataframe calling all deferred functions which were used to build it."""
pass

@abc.abstractmethod
def wait_computations(self):
"""Wait for all computations to complete without materializing data."""
pass

# END Data Management Methods

# To/From Pandas
Expand Down
3 changes: 3 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ def lazy_execution(self):
def finalize(self):
self._modin_frame.finalize()

def wait_computations(self):
self._modin_frame.wait_computations()

Check warning on line 282 in modin/core/storage_formats/pandas/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler.py#L282

Added line #L282 was not covered by tests

def to_pandas(self):
return self._modin_frame.to_pandas()

Expand Down
3 changes: 3 additions & 0 deletions modin/experimental/core/storage_formats/hdk/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ def finalize(self):
# TODO: implement this for HDK storage format
raise NotImplementedError()

def wait_computations(self):

Check warning on line 186 in modin/experimental/core/storage_formats/hdk/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/storage_formats/hdk/query_compiler.py#L186

Added line #L186 was not covered by tests
raise NotImplementedError()

def to_pandas(self):
return self._modin_frame.to_pandas()

Expand Down
1 change: 1 addition & 0 deletions modin/test/test_executions_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def test_base_abstract_methods():
"__init__",
"free",
"finalize",
"wait_computations",
"to_pandas",
"from_pandas",
"from_arrow",
Expand Down
29 changes: 17 additions & 12 deletions modin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,23 +606,32 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any:
return obj


def trigger_import(df: Any):
def trigger_import(obj: Any) -> None:
"""
Trigger import execution for DataFrames obtained by HDK engine.
Parameters
----------
df : Any
DataFrames to trigger import.
obj : Any
An object to trigger deferred data import.
"""
if hasattr(df, "_query_compiler"):
modin_frame = df._query_compiler._modin_frame
if hasattr(obj, "_query_compiler"):
modin_frame = obj._query_compiler._modin_frame
if hasattr(modin_frame, "force_import"):
modin_frame.force_import()

Check warning on line 621 in modin/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/utils.py#L618-L621

Added lines #L618 - L621 were not covered by tests


def wait_computations(obj: Any, *, trigger_hdk_import: bool = False):
"""Make sure obj's computations finished."""
def wait_computations(obj: Any, *, trigger_hdk_import: bool = False) -> None:
"""
Trigger the `obj` lazy computations, if any, and wait for them to complete.
Parameters
----------
obj : Any
An object to trigger lazy computations.
trigger_hdk_import : bool, default: False
Trigger import execution. Makes sense only for HDK storage format.
"""
if not hasattr(obj, "_query_compiler"):
return

Check warning on line 636 in modin/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/utils.py#L635-L636

Added lines #L635 - L636 were not covered by tests

Expand All @@ -634,11 +643,7 @@ def wait_computations(obj: Any, *, trigger_hdk_import: bool = False):
return

Check warning on line 643 in modin/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/utils.py#L642-L643

Added lines #L642 - L643 were not covered by tests

obj._query_compiler.finalize()

partitions = obj._query_compiler._modin_frame._partitions.flatten()
mgr_cls = obj._query_compiler._modin_frame._partition_mgr_cls
if len(partitions) and hasattr(mgr_cls, "wait_partitions"):
mgr_cls.wait_partitions(partitions)
obj._query_compiler.wait_computations()

Check warning on line 646 in modin/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/utils.py#L645-L646

Added lines #L645 - L646 were not covered by tests


def wrap_into_list(*args: Any, skipna: bool = True) -> List[Any]:
Expand Down

0 comments on commit 6f4088a

Please sign in to comment.