Skip to content

Commit

Permalink
FEAT-#5221: add 'wait_computations' to trigger lazy computations and …
Browse files Browse the repository at this point in the history
…wait for them to complete

Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed Oct 13, 2023
1 parent ce54013 commit f5c6121
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
8 changes: 4 additions & 4 deletions asv_bench/benchmarks/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,11 +458,11 @@ def trigger_import(*dfs):
*dfs : iterable
DataFrames to trigger import.
"""
if ASV_USE_STORAGE_FORMAT != "hdk" or ASV_USE_IMPL == "pandas":
return

for df in dfs:
df._query_compiler._modin_frame.force_import()
if hasattr(df, "_query_compiler"):
modin_frame = df._query_compiler._modin_frame
if hasattr(modin_frame, "force_import"):
modin_frame.force_import()


def execute(
Expand Down
35 changes: 35 additions & 0 deletions modin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,41 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any:
return obj


def trigger_import(df: Any):
"""
Trigger import execution for DataFrames obtained by HDK engine.
Parameters
----------
df : Any
DataFrames to trigger import.
"""
if hasattr(df, "_query_compiler"):
modin_frame = df._query_compiler._modin_frame
if hasattr(modin_frame, "force_import"):
modin_frame.force_import()

Check warning on line 621 in modin/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/utils.py#L618-L621

Added lines #L618 - L621 were not covered by tests


def wait_computations(obj: Any, *, trigger_hdk_import: bool = False):
"""Make sure obj's computations finished."""
if not hasattr(obj, "_query_compiler"):
return

Check warning on line 627 in modin/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/utils.py#L626-L627

Added lines #L626 - L627 were not covered by tests

if Engine.get() == "Native":
if trigger_hdk_import:
trigger_import(obj)

Check warning on line 631 in modin/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/utils.py#L629-L631

Added lines #L629 - L631 were not covered by tests
else:
obj._query_compiler._modin_frame._execute()
return

Check warning on line 634 in modin/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/utils.py#L633-L634

Added lines #L633 - L634 were not covered by tests

obj._query_compiler.finalize()

Check warning on line 636 in modin/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/utils.py#L636

Added line #L636 was not covered by tests

partitions = obj._query_compiler._modin_frame._partitions.flatten()
mgr_cls = obj._query_compiler._modin_frame._partition_mgr_cls
if len(partitions) and hasattr(mgr_cls, "wait_partitions"):
mgr_cls.wait_partitions(partitions)

Check warning on line 641 in modin/utils.py

View check run for this annotation

Codecov / codecov/patch

modin/utils.py#L638-L641

Added lines #L638 - L641 were not covered by tests


def wrap_into_list(*args: Any, skipna: bool = True) -> List[Any]:
"""
Wrap a sequence of passed values in a flattened list.
Expand Down

0 comments on commit f5c6121

Please sign in to comment.