From b8323b5f46e25d257c08f55745c018197f7530f5 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Tue, 21 Nov 2023 19:01:35 +0100 Subject: [PATCH] PERF-#6762: Carry dtypes information in lazy indices (#6763) Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/algebra/groupby.py | 21 ++++++- .../dataframe/pandas/dataframe/dataframe.py | 6 ++ modin/core/dataframe/pandas/metadata/index.py | 26 +++++++- .../storage_formats/pandas/query_compiler.py | 59 +++++++++++++++---- .../storage_formats/pandas/test_internals.py | 47 +++++++++++++++ 5 files changed, 144 insertions(+), 15 deletions(-) diff --git a/modin/core/dataframe/algebra/groupby.py b/modin/core/dataframe/algebra/groupby.py index e4c03feb63f..6a9dc67b51a 100644 --- a/modin/core/dataframe/algebra/groupby.py +++ b/modin/core/dataframe/algebra/groupby.py @@ -15,6 +15,7 @@ import pandas +from modin.core.dataframe.pandas.metadata import ModinIndex from modin.error_message import ErrorMessage from modin.utils import MODIN_UNNAMED_SERIES_LABEL, hashable @@ -407,8 +408,26 @@ def caller( # Otherwise `by` was already bound to the Map function in `build_map_reduce_functions`. broadcastable_by = getattr(by, "_modin_frame", None) apply_indices = list(map_func.keys()) if isinstance(map_func, dict) else None + if ( + broadcastable_by is not None + and groupby_kwargs.get("as_index", True) + and broadcastable_by.has_materialized_dtypes + ): + new_index = ModinIndex( + # value can be anything here, as it will be reassigned on a parent update + value=query_compiler._modin_frame, + axis=0, + dtypes=broadcastable_by.dtypes, + ) + else: + new_index = None new_modin_frame = query_compiler._modin_frame.groupby_reduce( - axis, broadcastable_by, map_fn, reduce_fn, apply_indices=apply_indices + axis, + broadcastable_by, + map_fn, + reduce_fn, + apply_indices=apply_indices, + new_index=new_index, ) result = query_compiler.__constructor__(new_modin_frame) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 81092b7eb57..4be07569f5b 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3911,6 +3911,12 @@ def join_cols(df, *cols): row_lengths=result._row_lengths_cache, ) + if not result.has_materialized_index: + by_dtypes = ModinDtypes(self._dtypes).lazy_get(by) + if by_dtypes.is_materialized: + new_index = ModinIndex(value=result, axis=0, dtypes=by_dtypes) + result.set_index_cache(new_index) + if result_schema is not None: new_dtypes = pandas.Series(result_schema) diff --git a/modin/core/dataframe/pandas/metadata/index.py b/modin/core/dataframe/pandas/metadata/index.py index 6242b61e2f8..52a6d536aaf 100644 --- a/modin/core/dataframe/pandas/metadata/index.py +++ b/modin/core/dataframe/pandas/metadata/index.py @@ -35,13 +35,16 @@ class ModinIndex: axis : int, optional Specifies an axis the object represents, serves as an optional hint. This parameter must be passed in case value is a ``PandasDataframe``. + dtypes : pandas.Series, optional + Materialized dtypes of index levels. """ - def __init__(self, value, axis=None): + def __init__(self, value, axis=None, dtypes=None): from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe self._is_default_callable = False self._axis = axis + self._dtypes = dtypes if callable(value): self._value = value @@ -58,6 +61,25 @@ def __init__(self, value, axis=None): self._index_id = uuid.uuid4() self._lengths_id = uuid.uuid4() + def maybe_get_dtypes(self): + """ + Get index dtypes if available. + + Returns + ------- + pandas.Series or None + """ + if self._dtypes is not None: + return self._dtypes + if self.is_materialized: + self._dtypes = ( + self._value.dtypes + if isinstance(self._value, pandas.MultiIndex) + else pandas.Series([self._value.dtype], index=[self._value.name]) + ) + return self._dtypes + return None + @staticmethod def _get_default_callable(dataframe_obj, axis): """ @@ -308,7 +330,7 @@ def copy(self, copy_lengths=False) -> "ModinIndex": idx_cache = self._value if not callable(idx_cache): idx_cache = idx_cache.copy() - result = ModinIndex(idx_cache, axis=self._axis) + result = ModinIndex(idx_cache, axis=self._axis, dtypes=self._dtypes) result._index_id = self._index_id result._is_default_callable = self._is_default_callable if copy_lengths: diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 14020e031c6..373d0d876ab 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -61,6 +61,7 @@ from modin.core.dataframe.pandas.metadata import ( DtypesDescriptor, ModinDtypes, + ModinIndex, extract_dtype, ) from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler @@ -589,16 +590,9 @@ def map_func( [right_pandas.dtypes, right_index_dtypes] )[right_renamer.keys()].rename(right_renamer) - left_index_dtypes = None - if self._modin_frame.has_materialized_index: - left_index_dtypes = ( - self.index.dtypes - if isinstance(self.index, pandas.MultiIndex) - else pandas.Series( - [self.index.dtype], index=[self.index.name] - ) - ) - + left_index_dtypes = ( + self._modin_frame._index_cache.maybe_get_dtypes() + ) left_dtypes = ( ModinDtypes.concat( [self._modin_frame._dtypes, left_index_dtypes] @@ -755,12 +749,36 @@ def _reset(df, *axis_lengths, partition_idx): # pragma: no cover copy_lengths=True ) else: - # concat index dtypes (None, since they're unknown) with column dtypes + # concat index dtypes with column dtypes + index_dtypes = self._modin_frame._index_cache.maybe_get_dtypes() try: - dtypes = ModinDtypes.concat([None, self._modin_frame._dtypes]) + dtypes = ModinDtypes.concat( + [ + index_dtypes, + self._modin_frame._dtypes, + ] + ) except NotImplementedError: # may raise on duplicated names in materialized 'self.dtypes' dtypes = None + if ( + # can precompute new columns if we know columns and index names + self._modin_frame.has_materialized_columns + and index_dtypes is not None + ): + empty_index = ( + pandas.Index([0], name=index_dtypes.index[0]) + if len(index_dtypes) == 1 + else pandas.MultiIndex.from_arrays( + [[i] for i in range(len(index_dtypes))], + names=index_dtypes.index, + ) + ) + new_columns = ( + pandas.DataFrame(columns=self.columns, index=empty_index) + .reset_index(**kwargs) + .columns + ) return self.__constructor__( self._modin_frame.apply_full_axis( @@ -4124,12 +4142,29 @@ def compute_groupby(df, drop=False, partition_idx=0): else: apply_indices = None + if ( + # For now handling only simple cases, where 'by' columns are described by a single query compiler + agg_kwargs.get("as_index", True) + and len(not_broadcastable_by) == 0 + and len(broadcastable_by) == 1 + and broadcastable_by[0].has_materialized_dtypes + ): + new_index = ModinIndex( + # value can be anything here, as it will be reassigned on a parent update + value=self._modin_frame, + axis=0, + dtypes=broadcastable_by[0].dtypes, + ) + else: + new_index = None + new_modin_frame = self._modin_frame.broadcast_apply_full_axis( axis=axis, func=lambda df, by=None, partition_idx=None: groupby_agg_builder( df, by, drop, partition_idx ), other=broadcastable_by, + new_index=new_index, apply_indices=apply_indices, enumerate_partitions=True, ) diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index c205bf1be4b..c0e1bf90ec4 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -2319,3 +2319,50 @@ def test_preserve_dtypes_reset_index(self, drop, has_materialized_index): ) patch.assert_not_called() + + def test_groupby_index_dtype(self): + with mock.patch.object(PandasDataframe, "_compute_dtypes") as patch: + # case 1: MapReduce impl, Series as an output of groupby + df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]}) + res = df.groupby("a").size().reset_index(name="new_name") + res_dtypes = res._query_compiler._modin_frame._dtypes._value + assert "a" in res_dtypes._known_dtypes + assert res_dtypes._known_dtypes["a"] == np.dtype(int) + + # case 2: ExperimentalImpl impl, Series as an output of groupby + ExperimentalGroupbyImpl.put(True) + try: + df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]}) + res = df.groupby("a").size().reset_index(name="new_name") + res_dtypes = res._query_compiler._modin_frame._dtypes._value + assert "a" in res_dtypes._known_dtypes + assert res_dtypes._known_dtypes["a"] == np.dtype(int) + finally: + ExperimentalGroupbyImpl.put(False) + + # case 3: MapReduce impl, DataFrame as an output of groupby + df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]}) + res = df.groupby("a").sum().reset_index() + res_dtypes = res._query_compiler._modin_frame._dtypes._value + assert "a" in res_dtypes._known_dtypes + assert res_dtypes._known_dtypes["a"] == np.dtype(int) + + # case 4: ExperimentalImpl impl, DataFrame as an output of groupby + ExperimentalGroupbyImpl.put(True) + try: + df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]}) + res = df.groupby("a").sum().reset_index() + res_dtypes = res._query_compiler._modin_frame._dtypes._value + assert "a" in res_dtypes._known_dtypes + assert res_dtypes._known_dtypes["a"] == np.dtype(int) + finally: + ExperimentalGroupbyImpl.put(False) + + # case 5: FullAxis impl, DataFrame as an output of groupby + df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]}) + res = df.groupby("a").quantile().reset_index() + res_dtypes = res._query_compiler._modin_frame._dtypes._value + assert "a" in res_dtypes._known_dtypes + assert res_dtypes._known_dtypes["a"] == np.dtype(int) + + patch.assert_not_called()