From f5c612154591d392a3f7bf3a4a72653182048c6f Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Sat, 14 Oct 2023 00:46:31 +0200 Subject: [PATCH] FEAT-#5221: add 'wait_computations' to trigger lazy computations and wait for them to complete Signed-off-by: Anatoly Myachev --- asv_bench/benchmarks/utils/common.py | 8 +++---- modin/utils.py | 35 ++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/asv_bench/benchmarks/utils/common.py b/asv_bench/benchmarks/utils/common.py index 81f2292a5fc..12151edfda8 100644 --- a/asv_bench/benchmarks/utils/common.py +++ b/asv_bench/benchmarks/utils/common.py @@ -458,11 +458,11 @@ def trigger_import(*dfs): *dfs : iterable DataFrames to trigger import. """ - if ASV_USE_STORAGE_FORMAT != "hdk" or ASV_USE_IMPL == "pandas": - return - for df in dfs: - df._query_compiler._modin_frame.force_import() + if hasattr(df, "_query_compiler"): + modin_frame = df._query_compiler._modin_frame + if hasattr(modin_frame, "force_import"): + modin_frame.force_import() def execute( diff --git a/modin/utils.py b/modin/utils.py index 6aba77f530f..0bf8cb14f3f 100644 --- a/modin/utils.py +++ b/modin/utils.py @@ -606,6 +606,41 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any: return obj +def trigger_import(df: Any): + """ + Trigger import execution for DataFrames obtained by HDK engine. + + Parameters + ---------- + df : Any + DataFrames to trigger import. + """ + if hasattr(df, "_query_compiler"): + modin_frame = df._query_compiler._modin_frame + if hasattr(modin_frame, "force_import"): + modin_frame.force_import() + + +def wait_computations(obj: Any, *, trigger_hdk_import: bool = False): + """Make sure obj's computations finished.""" + if not hasattr(obj, "_query_compiler"): + return + + if Engine.get() == "Native": + if trigger_hdk_import: + trigger_import(obj) + else: + obj._query_compiler._modin_frame._execute() + return + + obj._query_compiler.finalize() + + partitions = obj._query_compiler._modin_frame._partitions.flatten() + mgr_cls = obj._query_compiler._modin_frame._partition_mgr_cls + if len(partitions) and hasattr(mgr_cls, "wait_partitions"): + mgr_cls.wait_partitions(partitions) + + def wrap_into_list(*args: Any, skipna: bool = True) -> List[Any]: """ Wrap a sequence of passed values in a flattened list.