diff --git a/modin/core/storage_formats/pandas/merge.py b/modin/core/storage_formats/pandas/merge.py index d1e05f113d9..1fe86ae1f56 100644 --- a/modin/core/storage_formats/pandas/merge.py +++ b/modin/core/storage_formats/pandas/merge.py @@ -13,15 +13,23 @@ """Contains implementations for Merge/Join.""" +from __future__ import annotations + +from typing import TYPE_CHECKING + import pandas from pandas.core.dtypes.common import is_list_like from pandas.errors import MergeError from modin.core.dataframe.base.dataframe.utils import join_columns from modin.core.dataframe.pandas.metadata import ModinDtypes +from modin.config import NPartitions, MinPartitionSize from .utils import merge_partitioning +if TYPE_CHECKING: + from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler + # TODO: add methods for 'join' here class MergeImpl: @@ -93,7 +101,9 @@ def func(left, right): ).reset_index(drop=True) @classmethod - def row_axis_merge(cls, left, right, kwargs): + def row_axis_merge( + cls, left: PandasQueryCompiler, right: PandasQueryCompiler, kwargs: dict + ): """ Execute merge using row-axis implementation. @@ -164,6 +174,14 @@ def map_func( left, right, on, left_on, right_on, kwargs.get("suffixes", ("_x", "_y")) ) + # partitioning is too bad, it's more profitable to repartition + if ( + left._modin_frame._partitions.shape[0] < 0.3 * NPartitions.get() + # to avoid empty partitions after repartition; can materialize index + and len(left._modin_frame) > NPartitions.get() * MinPartitionSize.get() + ): + left = left.repartition(axis=0) + new_left = left.__constructor__( left._modin_frame.broadcast_apply_full_axis( axis=1,