diff --git a/asv_bench/benchmarks/utils/common.py b/asv_bench/benchmarks/utils/common.py index 81f2292a5fc..12151edfda8 100644 --- a/asv_bench/benchmarks/utils/common.py +++ b/asv_bench/benchmarks/utils/common.py @@ -458,11 +458,11 @@ def trigger_import(*dfs): *dfs : iterable DataFrames to trigger import. """ - if ASV_USE_STORAGE_FORMAT != "hdk" or ASV_USE_IMPL == "pandas": - return - for df in dfs: - df._query_compiler._modin_frame.force_import() + if hasattr(df, "_query_compiler"): + modin_frame = df._query_compiler._modin_frame + if hasattr(modin_frame, "force_import"): + modin_frame.force_import() def execute( diff --git a/modin/utils.py b/modin/utils.py index 6aba77f530f..0bf8cb14f3f 100644 --- a/modin/utils.py +++ b/modin/utils.py @@ -606,6 +606,41 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any: return obj +def trigger_import(df: Any): + """ + Trigger import execution for DataFrames obtained by HDK engine. + + Parameters + ---------- + df : Any + DataFrames to trigger import. + """ + if hasattr(df, "_query_compiler"): + modin_frame = df._query_compiler._modin_frame + if hasattr(modin_frame, "force_import"): + modin_frame.force_import() + + +def wait_computations(obj: Any, *, trigger_hdk_import: bool = False): + """Make sure obj's computations finished.""" + if not hasattr(obj, "_query_compiler"): + return + + if Engine.get() == "Native": + if trigger_hdk_import: + trigger_import(obj) + else: + obj._query_compiler._modin_frame._execute() + return + + obj._query_compiler.finalize() + + partitions = obj._query_compiler._modin_frame._partitions.flatten() + mgr_cls = obj._query_compiler._modin_frame._partition_mgr_cls + if len(partitions) and hasattr(mgr_cls, "wait_partitions"): + mgr_cls.wait_partitions(partitions) + + def wrap_into_list(*args: Any, skipna: bool = True) -> List[Any]: """ Wrap a sequence of passed values in a flattened list.