Skip to content

Commit

Permalink
PERF-#6762: Carry dtypes information in lazy indices (#6763)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev authored Nov 21, 2023
1 parent 794ac6f commit b8323b5
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 15 deletions.
21 changes: 20 additions & 1 deletion modin/core/dataframe/algebra/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import pandas

from modin.core.dataframe.pandas.metadata import ModinIndex
from modin.error_message import ErrorMessage
from modin.utils import MODIN_UNNAMED_SERIES_LABEL, hashable

Expand Down Expand Up @@ -407,8 +408,26 @@ def caller(
# Otherwise `by` was already bound to the Map function in `build_map_reduce_functions`.
broadcastable_by = getattr(by, "_modin_frame", None)
apply_indices = list(map_func.keys()) if isinstance(map_func, dict) else None
if (
broadcastable_by is not None
and groupby_kwargs.get("as_index", True)
and broadcastable_by.has_materialized_dtypes
):
new_index = ModinIndex(
# value can be anything here, as it will be reassigned on a parent update
value=query_compiler._modin_frame,
axis=0,
dtypes=broadcastable_by.dtypes,
)
else:
new_index = None
new_modin_frame = query_compiler._modin_frame.groupby_reduce(
axis, broadcastable_by, map_fn, reduce_fn, apply_indices=apply_indices
axis,
broadcastable_by,
map_fn,
reduce_fn,
apply_indices=apply_indices,
new_index=new_index,
)

result = query_compiler.__constructor__(new_modin_frame)
Expand Down
6 changes: 6 additions & 0 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3911,6 +3911,12 @@ def join_cols(df, *cols):
row_lengths=result._row_lengths_cache,
)

if not result.has_materialized_index:
by_dtypes = ModinDtypes(self._dtypes).lazy_get(by)
if by_dtypes.is_materialized:
new_index = ModinIndex(value=result, axis=0, dtypes=by_dtypes)
result.set_index_cache(new_index)

if result_schema is not None:
new_dtypes = pandas.Series(result_schema)

Expand Down
26 changes: 24 additions & 2 deletions modin/core/dataframe/pandas/metadata/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ class ModinIndex:
axis : int, optional
Specifies an axis the object represents, serves as an optional hint. This parameter
must be passed in case value is a ``PandasDataframe``.
dtypes : pandas.Series, optional
Materialized dtypes of index levels.
"""

def __init__(self, value, axis=None):
def __init__(self, value, axis=None, dtypes=None):
from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe

self._is_default_callable = False
self._axis = axis
self._dtypes = dtypes

if callable(value):
self._value = value
Expand All @@ -58,6 +61,25 @@ def __init__(self, value, axis=None):
self._index_id = uuid.uuid4()
self._lengths_id = uuid.uuid4()

def maybe_get_dtypes(self):
"""
Get index dtypes if available.
Returns
-------
pandas.Series or None
"""
if self._dtypes is not None:
return self._dtypes
if self.is_materialized:
self._dtypes = (
self._value.dtypes
if isinstance(self._value, pandas.MultiIndex)
else pandas.Series([self._value.dtype], index=[self._value.name])
)
return self._dtypes
return None

@staticmethod
def _get_default_callable(dataframe_obj, axis):
"""
Expand Down Expand Up @@ -308,7 +330,7 @@ def copy(self, copy_lengths=False) -> "ModinIndex":
idx_cache = self._value
if not callable(idx_cache):
idx_cache = idx_cache.copy()
result = ModinIndex(idx_cache, axis=self._axis)
result = ModinIndex(idx_cache, axis=self._axis, dtypes=self._dtypes)
result._index_id = self._index_id
result._is_default_callable = self._is_default_callable
if copy_lengths:
Expand Down
59 changes: 47 additions & 12 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from modin.core.dataframe.pandas.metadata import (
DtypesDescriptor,
ModinDtypes,
ModinIndex,
extract_dtype,
)
from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler
Expand Down Expand Up @@ -589,16 +590,9 @@ def map_func(
[right_pandas.dtypes, right_index_dtypes]
)[right_renamer.keys()].rename(right_renamer)

left_index_dtypes = None
if self._modin_frame.has_materialized_index:
left_index_dtypes = (
self.index.dtypes
if isinstance(self.index, pandas.MultiIndex)
else pandas.Series(
[self.index.dtype], index=[self.index.name]
)
)

left_index_dtypes = (
self._modin_frame._index_cache.maybe_get_dtypes()
)
left_dtypes = (
ModinDtypes.concat(
[self._modin_frame._dtypes, left_index_dtypes]
Expand Down Expand Up @@ -755,12 +749,36 @@ def _reset(df, *axis_lengths, partition_idx): # pragma: no cover
copy_lengths=True
)
else:
# concat index dtypes (None, since they're unknown) with column dtypes
# concat index dtypes with column dtypes
index_dtypes = self._modin_frame._index_cache.maybe_get_dtypes()
try:
dtypes = ModinDtypes.concat([None, self._modin_frame._dtypes])
dtypes = ModinDtypes.concat(
[
index_dtypes,
self._modin_frame._dtypes,
]
)
except NotImplementedError:
# may raise on duplicated names in materialized 'self.dtypes'
dtypes = None
if (
# can precompute new columns if we know columns and index names
self._modin_frame.has_materialized_columns
and index_dtypes is not None
):
empty_index = (
pandas.Index([0], name=index_dtypes.index[0])
if len(index_dtypes) == 1
else pandas.MultiIndex.from_arrays(
[[i] for i in range(len(index_dtypes))],
names=index_dtypes.index,
)
)
new_columns = (
pandas.DataFrame(columns=self.columns, index=empty_index)
.reset_index(**kwargs)
.columns
)

return self.__constructor__(
self._modin_frame.apply_full_axis(
Expand Down Expand Up @@ -4124,12 +4142,29 @@ def compute_groupby(df, drop=False, partition_idx=0):
else:
apply_indices = None

if (
# For now handling only simple cases, where 'by' columns are described by a single query compiler
agg_kwargs.get("as_index", True)
and len(not_broadcastable_by) == 0
and len(broadcastable_by) == 1
and broadcastable_by[0].has_materialized_dtypes
):
new_index = ModinIndex(
# value can be anything here, as it will be reassigned on a parent update
value=self._modin_frame,
axis=0,
dtypes=broadcastable_by[0].dtypes,
)
else:
new_index = None

new_modin_frame = self._modin_frame.broadcast_apply_full_axis(
axis=axis,
func=lambda df, by=None, partition_idx=None: groupby_agg_builder(
df, by, drop, partition_idx
),
other=broadcastable_by,
new_index=new_index,
apply_indices=apply_indices,
enumerate_partitions=True,
)
Expand Down
47 changes: 47 additions & 0 deletions modin/test/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2319,3 +2319,50 @@ def test_preserve_dtypes_reset_index(self, drop, has_materialized_index):
)

patch.assert_not_called()

def test_groupby_index_dtype(self):
with mock.patch.object(PandasDataframe, "_compute_dtypes") as patch:
# case 1: MapReduce impl, Series as an output of groupby
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
res = df.groupby("a").size().reset_index(name="new_name")
res_dtypes = res._query_compiler._modin_frame._dtypes._value
assert "a" in res_dtypes._known_dtypes
assert res_dtypes._known_dtypes["a"] == np.dtype(int)

# case 2: ExperimentalImpl impl, Series as an output of groupby
ExperimentalGroupbyImpl.put(True)
try:
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
res = df.groupby("a").size().reset_index(name="new_name")
res_dtypes = res._query_compiler._modin_frame._dtypes._value
assert "a" in res_dtypes._known_dtypes
assert res_dtypes._known_dtypes["a"] == np.dtype(int)
finally:
ExperimentalGroupbyImpl.put(False)

# case 3: MapReduce impl, DataFrame as an output of groupby
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
res = df.groupby("a").sum().reset_index()
res_dtypes = res._query_compiler._modin_frame._dtypes._value
assert "a" in res_dtypes._known_dtypes
assert res_dtypes._known_dtypes["a"] == np.dtype(int)

# case 4: ExperimentalImpl impl, DataFrame as an output of groupby
ExperimentalGroupbyImpl.put(True)
try:
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
res = df.groupby("a").sum().reset_index()
res_dtypes = res._query_compiler._modin_frame._dtypes._value
assert "a" in res_dtypes._known_dtypes
assert res_dtypes._known_dtypes["a"] == np.dtype(int)
finally:
ExperimentalGroupbyImpl.put(False)

# case 5: FullAxis impl, DataFrame as an output of groupby
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
res = df.groupby("a").quantile().reset_index()
res_dtypes = res._query_compiler._modin_frame._dtypes._value
assert "a" in res_dtypes._known_dtypes
assert res_dtypes._known_dtypes["a"] == np.dtype(int)

patch.assert_not_called()

0 comments on commit b8323b5

Please sign in to comment.