Skip to content

Commit

Permalink
FIX-#5461: fix groupby if dataframe has empty partitions (#6307)
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev authored and vnlitvinov committed Jul 3, 2023
1 parent 9852337 commit f883c25
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 2 deletions.
4 changes: 2 additions & 2 deletions modin/core/dataframe/base/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def window(
Notes
-----
The user-defined reduce function must reduce each windows column
The user-defined reduce function must reduce each window's column
(row if axis=1) down to a single value.
"""
pass
Expand Down Expand Up @@ -467,7 +467,7 @@ def from_labels(self) -> "ModinDataframe":
Notes
-----
In the case that the dataframe has hierarchical labels, all label "levels are inserted into the dataframe
In the case that the dataframe has hierarchical labels, all label "levels" are inserted into the dataframe
in the order they occur in the labels, with the outermost being in position 0.
"""
pass
Expand Down
3 changes: 3 additions & 0 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3584,6 +3584,9 @@ def groupby_reduce(
self._get_dict_of_block_index(axis ^ 1, numeric_indices).keys()
)

if by_parts is not None:
# inplace operation
self._filter_empties(compute_metadata=False)
new_partitions = self._partition_mgr_cls.groupby_reduce(
axis, self._partitions, by_parts, map_func, reduce_func, apply_indices
)
Expand Down
7 changes: 7 additions & 0 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ def groupby_reduce(
)

if by is not None:
# need to make sure that the partitioning of the following objects
# coincides in the required axis, because `partition_manager.broadcast_apply`
# doesn't call `_copartition` unlike `modin_frame.broadcast_apply`
assert partitions.shape[axis] == by.shape[axis], (
f"the number of partitions along {axis=} is not equal: "
+ f"{partitions.shape[axis]} != {by.shape[axis]}"
)
mapped_partitions = cls.broadcast_apply(
axis, map_func, left=partitions, right=by
)
Expand Down
12 changes: 12 additions & 0 deletions modin/test/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,18 @@ def test_merge_partitioning(
)


def test_groupby_with_empty_partition():
# see #5461 for details
md_df = construct_modin_df_by_scheme(
pandas_df=pandas.DataFrame({"a": [1, 1, 2, 2], "b": [3, 4, 5, 6]}),
partitioning_scheme={"row_lengths": [2, 2], "column_widths": [2]},
)
md_res = md_df.query("a > 1")
grp_obj = md_res.groupby("a")
# check index error due to partitioning missmatching
grp_obj.count()


@pytest.mark.parametrize("set_num_partitions", [2], indirect=True)
def test_repartitioning(set_num_partitions):
"""
Expand Down

0 comments on commit f883c25

Please sign in to comment.