Skip to content

Commit

Permalink
FIX-#6607: Fix incorrect cache after '.sort_values()' (#6608)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev authored and anmyachev committed Oct 30, 2023
1 parent 4d10294 commit 1347797
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 20 deletions.
34 changes: 24 additions & 10 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2265,7 +2265,7 @@ def combine_and_apply(
)

def _apply_func_to_range_partitioning(
self, key_column, func, ascending=True, **kwargs
self, key_column, func, ascending=True, preserve_columns=False, **kwargs
):
"""
Reshuffle data so it would be range partitioned and then apply the passed function row-wise.
Expand All @@ -2278,6 +2278,8 @@ def _apply_func_to_range_partitioning(
Function to apply against partitions.
ascending : bool, default: True
Whether the range should be built in ascending or descending order.
preserve_columns : bool, default: False
If the columns cache should be preserved (specify this flag if `func` doesn't change column labels).
**kwargs : dict
Additional arguments to forward to the range builder function.
Expand All @@ -2288,7 +2290,14 @@ def _apply_func_to_range_partitioning(
"""
# If there's only one row partition can simply apply the function row-wise without the need to reshuffle
if self._partitions.shape[0] == 1:
return self.apply_full_axis(axis=1, func=func)
result = self.apply_full_axis(
axis=1,
func=func,
new_columns=self.copy_columns_cache() if preserve_columns else None,
)
if preserve_columns:
result._set_axis_lengths_cache(self._column_widths_cache, axis=1)
return result

ideal_num_new_partitions = len(self._partitions)
m = len(self.index) / ideal_num_new_partitions
Expand Down Expand Up @@ -2365,7 +2374,14 @@ def _apply_func_to_range_partitioning(
func,
)

return self.__constructor__(new_partitions)
result = self.__constructor__(new_partitions)
if preserve_columns:
result.set_columns_cache(self.copy_columns_cache())
# We perform the final steps of the sort on full axis partitions, so we know that the
# length of each partition is the full length of the dataframe.
if self.has_materialized_columns:
result._set_axis_lengths_cache([len(self.columns)], axis=1)
return result

@lazy_metadata_decorator(apply_axis="both")
def sort_by(
Expand Down Expand Up @@ -2422,15 +2438,13 @@ def sort_function(df): # pragma: no cover
)

result = self._apply_func_to_range_partitioning(
key_column=columns[0], func=sort_function, ascending=ascending, **kwargs
key_column=columns[0],
func=sort_function,
ascending=ascending,
preserve_columns=True,
**kwargs,
)

result.set_axis_cache(self.copy_axis_cache(axis.value ^ 1), axis=axis.value ^ 1)
result.set_dtypes_cache(self.copy_dtypes_cache())
# We perform the final steps of the sort on full axis partitions, so we know that the
# length of each partition is the full length of the dataframe.
if self.has_materialized_columns:
result._set_axis_lengths_cache([len(self.columns)], axis=axis.value ^ 1)

if kwargs.get("ignore_index", False):
result.index = RangeIndex(len(self.get_axis(axis.value)))
Expand Down
70 changes: 60 additions & 10 deletions modin/test/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,30 @@ def construct_modin_df_by_scheme(pandas_df, partitioning_scheme):
return md_df


def validate_partitions_cache(df):
"""Assert that the ``PandasDataframe`` shape caches correspond to the actual partition's shapes."""
row_lengths = df._row_lengths_cache
column_widths = df._column_widths_cache
def validate_partitions_cache(df, axis=None):
"""
Assert that the ``PandasDataframe`` shape caches correspond to the actual partition's shapes.
assert row_lengths is not None
assert column_widths is not None
assert df._partitions.shape[0] == len(row_lengths)
assert df._partitions.shape[1] == len(column_widths)
Parameters
----------
df : PandasDataframe
axis : int, optional
An axis to verify the cache for. If not specified, verify cache for both of the axes.
"""
axis = [0, 1] if axis is None else [axis]

axis_lengths = [df._row_lengths_cache, df._column_widths_cache]

for ax in axis:
assert axis_lengths[ax] is not None
assert df._partitions.shape[ax] == len(axis_lengths[ax])

for i in range(df._partitions.shape[0]):
for j in range(df._partitions.shape[1]):
assert df._partitions[i, j].length() == row_lengths[i]
assert df._partitions[i, j].width() == column_widths[j]
if 0 in axis:
assert df._partitions[i, j].length() == axis_lengths[0][i]
if 1 in axis:
assert df._partitions[i, j].width() == axis_lengths[1][j]


def test_aligning_blocks():
Expand Down Expand Up @@ -1109,3 +1119,43 @@ def test_query_dispatching():
qc.rowwise_query("a < (b + @local_var + (b - e.min())) * c > 10")
with pytest.raises(NotImplementedError):
qc.rowwise_query("a < b.size")


def test_sort_values_cache():
"""
Test that the column widths cache after ``.sort_values()`` is valid:
https://github.com/modin-project/modin/issues/6607
"""
# 1 row partition and 2 column partitions, in this case '.sort_values()' will use
# row-wise implementation and so the column widths WILL NOT be changed
modin_df = construct_modin_df_by_scheme(
pandas.DataFrame({f"col{i}": range(100) for i in range(64)}),
partitioning_scheme={"row_lengths": [100], "column_widths": [32, 32]},
)
mf_initial = modin_df._query_compiler._modin_frame

mf_res = modin_df.sort_values("col0")._query_compiler._modin_frame
# check that row-wise implementation was indeed used (col widths were not changed)
assert mf_res._column_widths_cache == [32, 32]
# check that the cache and actual col widths match
validate_partitions_cache(mf_res, axis=1)
# check that the initial frame's cache wasn't changed
assert mf_initial._column_widths_cache == [32, 32]
validate_partitions_cache(mf_initial, axis=1)

# 2 row partition and 2 column partitions, in this case '.sort_values()' will use
# range-partitioning implementation and so the column widths WILL be changed
modin_df = construct_modin_df_by_scheme(
pandas.DataFrame({f"col{i}": range(100) for i in range(64)}),
partitioning_scheme={"row_lengths": [50, 50], "column_widths": [32, 32]},
)
mf_initial = modin_df._query_compiler._modin_frame

mf_res = modin_df.sort_values("col0")._query_compiler._modin_frame
# check that range-partitioning implementation was indeed used (col widths were changed)
assert mf_res._column_widths_cache == [64]
# check that the cache and actual col widths match
validate_partitions_cache(mf_res, axis=1)
# check that the initial frame's cache wasn't changed
assert mf_initial._column_widths_cache == [32, 32]
validate_partitions_cache(mf_initial, axis=1)

0 comments on commit 1347797

Please sign in to comment.