From e183124db6137499720ba194dd92691f8cf435ff Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Tue, 13 Aug 2024 08:25:53 +0000 Subject: [PATCH] Revert 'Change DP conditions' --- .../dataframe/pandas/dataframe/dataframe.py | 45 ++-------------- .../pandas/partitioning/partition_manager.py | 53 ++++++++++++++++++- 2 files changed, 55 insertions(+), 43 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index a4d9c1a515e..5456f28f127 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -33,7 +33,6 @@ from pandas.core.indexes.api import Index, RangeIndex from modin.config import ( - CpuCount, Engine, IsRayCluster, MinColumnPartitionSize, @@ -212,22 +211,6 @@ def num_parts(self) -> int: """ return np.prod(self._partitions.shape) - @property - def size(self) -> Optional[int]: - """ - Get an int representing the number of elements in this frame, if known. - - Returns - ------- - int or None - """ - if self.has_index_cache and self.has_columns_cache: - return len(self.index) * len(self.columns) - elif self._row_lengths_cache and self._column_widths_cache: - return sum(self._row_lengths_cache) * sum(self._column_widths_cache) - else: - return None - @property def row_lengths(self): """ @@ -3282,31 +3265,9 @@ def broadcast_apply( axis ), self.copy_axis_cache(axis) - # check the conditions for use of dynamic partitioning - use_dynamic_partitioning = False - if self.num_parts <= 1.5 * CpuCount.get(): - use_dynamic_partitioning = True - - # When the frame is large, dynamic partitioning - # performs worse than the based approach - frame_size = self.size - if frame_size and (frame_size >= 4 * 10**9 or len(self) >= 10**7): - use_dynamic_partitioning = False - - if use_dynamic_partitioning: - new_frame = self._partition_mgr_cls.broadcast_axis_partitions( - axis=axis ^ 1, - left=left_parts, - right=right_parts, - apply_func=func, - broadcast_all=False, - keep_partitioning=True, - ) - else: - new_frame = self._partition_mgr_cls.broadcast_apply( - axis, func, left_parts, right_parts - ) - + new_frame = self._partition_mgr_cls.broadcast_apply( + axis, func, left_parts, right_parts + ) if isinstance(dtypes, str) and dtypes == "copy": dtypes = self.copy_dtypes_cache() diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 22b9f4c0060..bd3a1d14760 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -439,7 +439,7 @@ def get_partitions(index): @classmethod @wait_computations_if_benchmark_mode - def broadcast_apply(cls, axis, apply_func, left, right): + def base_broadcast_apply(cls, axis, apply_func, left, right): """ Broadcast the `right` partitions to `left` and apply `apply_func` function. @@ -652,6 +652,57 @@ def base_map_partitions( ] ) + @classmethod + @wait_computations_if_benchmark_mode + def broadcast_apply( + cls, + axis, + apply_func, + left, + right, + ): + """ + Broadcast the `right` partitions to `left` and apply `apply_func` function using different approaches to achieve the best performance. + + Parameters + ---------- + axis : {0, 1} + Axis to apply and broadcast over. + apply_func : callable + Function to apply. + left : np.ndarray + NumPy array of left partitions. + right : np.ndarray + NumPy array of right partitions. + + Returns + ------- + np.ndarray + NumPy array of result partition objects. + """ + # The `broadcast_apply` runtime condition differs from + # the same condition in `map_partitions` because the columnar + # approach for `broadcast_apply` results in a slowdown. + if np.prod(left.shape) <= 1.5 * CpuCount.get(): + # block-wise broadcast + new_partitions = cls.base_broadcast_apply( + axis, + apply_func, + left, + right, + ) + else: + # axis-wise broadcast + new_partitions = cls.broadcast_axis_partitions( + axis=axis ^ 1, + left=left, + right=right, + apply_func=apply_func, + broadcast_all=False, + keep_partitioning=True, + ) + return new_partitions + @classmethod @wait_computations_if_benchmark_mode def map_partitions(