Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#6762: Carry dtypes information in lazy indices #6763

Merged
merged 2 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion modin/core/dataframe/algebra/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import pandas

from modin.core.dataframe.pandas.metadata import ModinIndex
from modin.error_message import ErrorMessage
from modin.utils import MODIN_UNNAMED_SERIES_LABEL, hashable

Expand Down Expand Up @@ -407,8 +408,26 @@ def caller(
# Otherwise `by` was already bound to the Map function in `build_map_reduce_functions`.
broadcastable_by = getattr(by, "_modin_frame", None)
apply_indices = list(map_func.keys()) if isinstance(map_func, dict) else None
if (
broadcastable_by is not None
and groupby_kwargs.get("as_index", True)
and broadcastable_by.has_materialized_dtypes
):
new_index = ModinIndex(
# value can be anything here, as it will be reassigned on a parent update
value=query_compiler._modin_frame,
axis=0,
dtypes=broadcastable_by.dtypes,
)
else:
new_index = None
new_modin_frame = query_compiler._modin_frame.groupby_reduce(
axis, broadcastable_by, map_fn, reduce_fn, apply_indices=apply_indices
axis,
broadcastable_by,
map_fn,
reduce_fn,
apply_indices=apply_indices,
new_index=new_index,
)

result = query_compiler.__constructor__(new_modin_frame)
Expand Down
6 changes: 6 additions & 0 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3911,6 +3911,12 @@ def join_cols(df, *cols):
row_lengths=result._row_lengths_cache,
)

if not result.has_materialized_index:
by_dtypes = ModinDtypes(self._dtypes).lazy_get(by)
if by_dtypes.is_materialized:
new_index = ModinIndex(value=result, axis=0, dtypes=by_dtypes)
result.set_index_cache(new_index)

if result_schema is not None:
new_dtypes = pandas.Series(result_schema)

Expand Down
26 changes: 24 additions & 2 deletions modin/core/dataframe/pandas/metadata/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ class ModinIndex:
axis : int, optional
Specifies an axis the object represents, serves as an optional hint. This parameter
must be passed in case value is a ``PandasDataframe``.
dtypes : pandas.Series, optional
Materialized dtypes of index levels.
"""

def __init__(self, value, axis=None):
def __init__(self, value, axis=None, dtypes=None):
from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe

self._is_default_callable = False
self._axis = axis
self._dtypes = dtypes

if callable(value):
self._value = value
Expand All @@ -58,6 +61,25 @@ def __init__(self, value, axis=None):
self._index_id = uuid.uuid4()
self._lengths_id = uuid.uuid4()

def maybe_get_dtypes(self):
"""
Get index dtypes if available.

Returns
-------
pandas.Series or None
"""
if self._dtypes is not None:
return self._dtypes
if self.is_materialized:
self._dtypes = (
self._value.dtypes
if isinstance(self._value, pandas.MultiIndex)
else pandas.Series([self._value.dtype], index=[self._value.name])
)
return self._dtypes
return None

@staticmethod
def _get_default_callable(dataframe_obj, axis):
"""
Expand Down Expand Up @@ -308,7 +330,7 @@ def copy(self, copy_lengths=False) -> "ModinIndex":
idx_cache = self._value
if not callable(idx_cache):
idx_cache = idx_cache.copy()
result = ModinIndex(idx_cache, axis=self._axis)
result = ModinIndex(idx_cache, axis=self._axis, dtypes=self._dtypes)
result._index_id = self._index_id
result._is_default_callable = self._is_default_callable
if copy_lengths:
Expand Down
59 changes: 47 additions & 12 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from modin.core.dataframe.pandas.metadata import (
DtypesDescriptor,
ModinDtypes,
ModinIndex,
extract_dtype,
)
from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler
Expand Down Expand Up @@ -589,16 +590,9 @@ def map_func(
[right_pandas.dtypes, right_index_dtypes]
)[right_renamer.keys()].rename(right_renamer)

left_index_dtypes = None
if self._modin_frame.has_materialized_index:
left_index_dtypes = (
self.index.dtypes
if isinstance(self.index, pandas.MultiIndex)
else pandas.Series(
[self.index.dtype], index=[self.index.name]
)
)

left_index_dtypes = (
self._modin_frame._index_cache.maybe_get_dtypes()
)
left_dtypes = (
ModinDtypes.concat(
[self._modin_frame._dtypes, left_index_dtypes]
Expand Down Expand Up @@ -755,12 +749,36 @@ def _reset(df, *axis_lengths, partition_idx): # pragma: no cover
copy_lengths=True
)
else:
# concat index dtypes (None, since they're unknown) with column dtypes
# concat index dtypes with column dtypes
index_dtypes = self._modin_frame._index_cache.maybe_get_dtypes()
try:
dtypes = ModinDtypes.concat([None, self._modin_frame._dtypes])
dtypes = ModinDtypes.concat(
[
index_dtypes,
self._modin_frame._dtypes,
]
)
except NotImplementedError:
# may raise on duplicated names in materialized 'self.dtypes'
dtypes = None
if (
# can precompute new columns if we know columns and index names
self._modin_frame.has_materialized_columns
and index_dtypes is not None
):
empty_index = (
pandas.Index([0], name=index_dtypes.index[0])
if len(index_dtypes) == 1
else pandas.MultiIndex.from_arrays(
[[i] for i in range(len(index_dtypes))],
names=index_dtypes.index,
)
)
new_columns = (
pandas.DataFrame(columns=self.columns, index=empty_index)
.reset_index(**kwargs)
.columns
)

return self.__constructor__(
self._modin_frame.apply_full_axis(
Expand Down Expand Up @@ -4124,12 +4142,29 @@ def compute_groupby(df, drop=False, partition_idx=0):
else:
apply_indices = None

if (
# For now handling only simple cases, where 'by' columns are described by a single query compiler
agg_kwargs.get("as_index", True)
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
and len(not_broadcastable_by) == 0
and len(broadcastable_by) == 1
Comment on lines +4148 to +4149
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why such restrictions?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These restrictions describe a simple case when 'by' are columns from the same dataframe. I don't want to handle other cases in this PR as pandas is always tricky in how it handles groupby with mixed by

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added an in-code comment about this

and broadcastable_by[0].has_materialized_dtypes
):
new_index = ModinIndex(
# value can be anything here, as it will be reassigned on a parent update
value=self._modin_frame,
axis=0,
dtypes=broadcastable_by[0].dtypes,
)
else:
new_index = None

new_modin_frame = self._modin_frame.broadcast_apply_full_axis(
axis=axis,
func=lambda df, by=None, partition_idx=None: groupby_agg_builder(
df, by, drop, partition_idx
),
other=broadcastable_by,
new_index=new_index,
apply_indices=apply_indices,
enumerate_partitions=True,
)
Expand Down
47 changes: 47 additions & 0 deletions modin/test/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2319,3 +2319,50 @@ def test_preserve_dtypes_reset_index(self, drop, has_materialized_index):
)

patch.assert_not_called()

def test_groupby_index_dtype(self):
with mock.patch.object(PandasDataframe, "_compute_dtypes") as patch:
# case 1: MapReduce impl, Series as an output of groupby
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
res = df.groupby("a").size().reset_index(name="new_name")
res_dtypes = res._query_compiler._modin_frame._dtypes._value
assert "a" in res_dtypes._known_dtypes
assert res_dtypes._known_dtypes["a"] == np.dtype(int)

# case 2: ExperimentalImpl impl, Series as an output of groupby
ExperimentalGroupbyImpl.put(True)
try:
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
res = df.groupby("a").size().reset_index(name="new_name")
res_dtypes = res._query_compiler._modin_frame._dtypes._value
assert "a" in res_dtypes._known_dtypes
assert res_dtypes._known_dtypes["a"] == np.dtype(int)
finally:
ExperimentalGroupbyImpl.put(False)

# case 3: MapReduce impl, DataFrame as an output of groupby
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
res = df.groupby("a").sum().reset_index()
res_dtypes = res._query_compiler._modin_frame._dtypes._value
assert "a" in res_dtypes._known_dtypes
assert res_dtypes._known_dtypes["a"] == np.dtype(int)

# case 4: ExperimentalImpl impl, DataFrame as an output of groupby
ExperimentalGroupbyImpl.put(True)
try:
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
res = df.groupby("a").sum().reset_index()
res_dtypes = res._query_compiler._modin_frame._dtypes._value
assert "a" in res_dtypes._known_dtypes
assert res_dtypes._known_dtypes["a"] == np.dtype(int)
finally:
ExperimentalGroupbyImpl.put(False)

# case 5: FullAxis impl, DataFrame as an output of groupby
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
res = df.groupby("a").quantile().reset_index()
res_dtypes = res._query_compiler._modin_frame._dtypes._value
assert "a" in res_dtypes._known_dtypes
assert res_dtypes._known_dtypes["a"] == np.dtype(int)

patch.assert_not_called()
Loading