Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try commits for Release 0.23.2 #6686

Closed
Closed
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ed0eba4
FIX-#6479: HDK CalciteBuilder: Do not call is_bool_dtype() for catego…
AndreyPavlenko Aug 18, 2023
92da6b5
FIX-#6509: Fix 'reshuffling' in case of a string key (#6510)
dchigarev Aug 26, 2023
a6301c5
FIX-#6514: test_sort_cols_str from test_dataframe.py crashed on HDK 0…
AndreyPavlenko Aug 28, 2023
19b3696
FIX-#6465: Fix `groupby.apply()` for UDFs that change the output's sh…
dchigarev Aug 28, 2023
77f75b3
FIX-#6516: HDK: test_dataframe.py is crashed if Calcite is disabled (…
AndreyPavlenko Aug 29, 2023
fd174ec
FIX-#6519: consider 'botocore' as an optional dependency (#6521)
anmyachev Aug 30, 2023
fcc404a
FIX-#4347: read_excel: defaults to pandas for unsupported types of 'i…
anmyachev Aug 31, 2023
afbf466
FIX-#6518: fix interchange protocol for string columns (#6523)
anmyachev Aug 31, 2023
8808bd5
FIX-#5536: remove branch disabling '__getattribute__' for experimenta…
anmyachev Sep 1, 2023
fb43709
FIX-#4687: change 'Column.null_count' to return a builtin int instead…
anmyachev Sep 5, 2023
be29452
FIX-#6535: Pin 's3fs<2023.9.0' (#6536)
dchigarev Sep 5, 2023
75d4aab
FIX-#6532: fix 'read_excel' so that it doesn't use 'rich_text' param …
anmyachev Sep 6, 2023
f09385e
FIX-#6541: fix 'ValueError: buffer source array is read-only' for 'il…
anmyachev Sep 8, 2023
c19f73e
FIX-#6537: Unpin 's3fs<2023.9.0' (#6544)
anmyachev Sep 11, 2023
1e53b08
FIX-#6553: fix 'read_csv' with 'iterator=True' (#6554)
anmyachev Sep 13, 2023
92432fa
FIX-#5164: Fix unwrap_partitions for virtual partitions when `axis=No…
anmyachev Sep 15, 2023
60363d9
FIX-#6572: Execute simple queries row-wise in pandas backend (#6575)
dchigarev Sep 18, 2023
4d10294
FIX-#6601: 'sort_values' shouldn't affect source dataframe/series (#6…
anmyachev Sep 26, 2023
1347797
FIX-#6607: Fix incorrect cache after '.sort_values()' (#6608)
dchigarev Sep 27, 2023
aa91806
FIX-#6602: refactor `join` to avoid `distributing a dict object` warn…
anmyachev Sep 29, 2023
dddd847
FIX-#6600: fix usage of list of UDF functions in 'Series.groupby.agg'…
anmyachev Sep 29, 2023
9d188d4
FIX-#6628: Allow groupby diff for dates (#6631)
Garra1980 Oct 5, 2023
80df280
FIX-#6635: HDK: read_csv(): treat object dtype as string (#6636)
AndreyPavlenko Oct 9, 2023
1a50916
FIX-#6637: Fix 'skiprows' parameter usage for 'read_excel' (#6638)
anmyachev Oct 9, 2023
f865768
FIX-#6642: fix 'modin.numpy.array.sum' on HDK (#6643)
anmyachev Oct 11, 2023
27a4917
FIX-#4507: Do not call 'ray.get()' inside of the kernel executing cal…
dchigarev Oct 11, 2023
2b73720
FIX-#6647: Added init file to make modin/experimental/sql/hdk/query.p…
Egor-Krivov Oct 13, 2023
712cfbe
FIX-#6651: make sure `Series.between` works correctly (#6656)
anmyachev Oct 17, 2023
ed7ade5
FIX-#6632: Return Series instead of Dataframe for groupby.apply in ca…
Garra1980 Oct 21, 2023
34a3754
FIX-#6680: Specify navigation_with_keys=True to fix docs build (#6681)
Garra1980 Oct 25, 2023
271ba82
FIX-#6594: fix usage of Modin objects inside UDFs for `apply` (#6673)
anmyachev Nov 14, 2023
c7607ce
pin unidist<=0.4.1
anmyachev Nov 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def noop_decorator(*args, **kwargs):
"icon": "fas fa-envelope-square",
},
],
"navigation_with_keys": True,
}

# Custom sidebar templates, must be a dictionary that maps document names
Expand Down
25 changes: 25 additions & 0 deletions modin/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,3 +712,28 @@
if not cli.list_buckets()["Buckets"]:
break
time.sleep(0.1)


@pytest.fixture
def modify_config(request):
values = request.param
old_values = {}

for key, value in values.items():
old_values[key] = key.get()
key.put(value)

yield # waiting for the test to be completed
# restoring old parameters
for key, value in old_values.items():
try:
key.put(value)
except ValueError as e:
# sometimes bool env variables have 'None' as a default value, which
# causes a ValueError when we try to set this value back, as technically,
# only bool values are allowed (and 'None' is not a bool), in this case
# we try to set 'False' instead
if key.type == bool and value is None:
key.put(False)
else:
raise e

Check warning on line 739 in modin/conftest.py

View check run for this annotation

Codecov / codecov/patch

modin/conftest.py#L739

Added line #L739 was not covered by tests
148 changes: 133 additions & 15 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from pandas._libs.lib import no_default
from typing import List, Hashable, Optional, Callable, Union, Dict, TYPE_CHECKING

from modin.config import Engine
from modin.config import Engine, IsRayCluster
from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler
from modin.core.storage_formats.pandas.utils import get_length_list
from modin.error_message import ErrorMessage
Expand Down Expand Up @@ -2265,7 +2265,7 @@
)

def _apply_func_to_range_partitioning(
self, key_column, func, ascending=True, **kwargs
self, key_column, func, ascending=True, preserve_columns=False, **kwargs
):
"""
Reshuffle data so it would be range partitioned and then apply the passed function row-wise.
Expand All @@ -2278,6 +2278,8 @@
Function to apply against partitions.
ascending : bool, default: True
Whether the range should be built in ascending or descending order.
preserve_columns : bool, default: False
If the columns cache should be preserved (specify this flag if `func` doesn't change column labels).
**kwargs : dict
Additional arguments to forward to the range builder function.

Expand All @@ -2288,7 +2290,14 @@
"""
# If there's only one row partition can simply apply the function row-wise without the need to reshuffle
if self._partitions.shape[0] == 1:
return self.apply_full_axis(axis=1, func=func)
result = self.apply_full_axis(
axis=1,
func=func,
new_columns=self.copy_columns_cache() if preserve_columns else None,
)
if preserve_columns:
result._set_axis_lengths_cache(self._column_widths_cache, axis=1)
return result

ideal_num_new_partitions = len(self._partitions)
m = len(self.index) / ideal_num_new_partitions
Expand All @@ -2311,12 +2320,12 @@
# simply combine all partitions and apply the sorting to the whole dataframe
return self.combine_and_apply(func=func)

if self.dtypes[key_column] == object:
if is_numeric_dtype(self.dtypes[key_column]):
method = "linear"
else:
# This means we are not sorting numbers, so we need our quantiles to not try
# arithmetic on the values.
method = "inverted_cdf"
else:
method = "linear"

shuffling_functions = build_sort_functions(
self,
Expand Down Expand Up @@ -2365,7 +2374,14 @@
func,
)

return self.__constructor__(new_partitions)
result = self.__constructor__(new_partitions)
if preserve_columns:
result.set_columns_cache(self.copy_columns_cache())
# We perform the final steps of the sort on full axis partitions, so we know that the
# length of each partition is the full length of the dataframe.
if self.has_materialized_columns:
result._set_axis_lengths_cache([len(self.columns)], axis=1)
return result

@lazy_metadata_decorator(apply_axis="both")
def sort_by(
Expand Down Expand Up @@ -2422,15 +2438,13 @@
)

result = self._apply_func_to_range_partitioning(
key_column=columns[0], func=sort_function, ascending=ascending, **kwargs
key_column=columns[0],
func=sort_function,
ascending=ascending,
preserve_columns=True,
**kwargs,
)

result.set_axis_cache(self.copy_axis_cache(axis.value ^ 1), axis=axis.value ^ 1)
result.set_dtypes_cache(self.copy_dtypes_cache())
# We perform the final steps of the sort on full axis partitions, so we know that the
# length of each partition is the full length of the dataframe.
if self.has_materialized_columns:
self._set_axis_lengths_cache([len(self.columns)], axis=axis.value ^ 1)

if kwargs.get("ignore_index", False):
result.index = RangeIndex(len(self.get_axis(axis.value)))
Expand Down Expand Up @@ -3495,6 +3509,7 @@
by: Union[str, List[str]],
operator: Callable,
result_schema: Optional[Dict[Hashable, type]] = None,
align_result_columns=False,
**kwargs: dict,
) -> "PandasDataframe":
"""
Expand All @@ -3512,6 +3527,10 @@
on the output desired by the user.
result_schema : dict, optional
Mapping from column labels to data types that represents the types of the output dataframe.
align_result_columns : bool, default: False
Whether to manually align columns between all the resulted row partitions.
This flag is helpful when dealing with UDFs as they can change the partition's shape
and labeling unpredictably, resulting in an invalid dataframe.
**kwargs : dict
Additional arguments to pass to the ``df.groupby`` method (besides the 'by' argument).

Expand Down Expand Up @@ -3541,19 +3560,118 @@
if not isinstance(by, list):
by = [by]

skip_on_aligning_flag = "__skip_me_on_aligning__"

def apply_func(df): # pragma: no cover
if any(is_categorical_dtype(dtype) for dtype in df.dtypes[by].values):
raise NotImplementedError(
"Reshuffling groupby is not yet supported when grouping on a categorical column. "
+ "https://github.com/modin-project/modin/issues/5925"
)
return operator(df.groupby(by, **kwargs))
result = operator(df.groupby(by, **kwargs))
if (
align_result_columns
and df.empty
and result.empty
and df.columns.equals(result.columns)
):
# We want to align columns only of those frames that actually performed
# some groupby aggregation, if an empty frame was originally passed
# (an empty bin on reshuffling was created) then there were no groupby
# executed over this partition and so it has incorrect columns
# that shouldn't be considered on the aligning phase
result.attrs[skip_on_aligning_flag] = True
return result

result = self._apply_func_to_range_partitioning(
key_column=by[0],
func=apply_func,
)

# no need aligning columns if there's only one row partition
if align_result_columns and result._partitions.shape[0] > 1:
# FIXME: the current reshuffling implementation guarantees us that there's only one column
# partition in the result, so we should never hit this exception for now, however
# in the future, we might want to make this implementation more broader
if result._partitions.shape[1] > 1:
raise NotImplementedError(
"Aligning columns is not yet implemented for multiple column partitions."
)

# There're two implementations:
# 1. The first one work faster, but may stress the network a lot in cluster mode since
# it gathers all the dataframes in a single ray-kernel.
# 2. The second one works slower, but only gathers light pandas.Index objects,
# so there should be less stress on the network.
if not IsRayCluster.get():

def compute_aligned_columns(*dfs):
"""Take row partitions, filter empty ones, and return joined columns for them."""
valid_dfs = [
df
for df in dfs
if not df.attrs.get(skip_on_aligning_flag, False)
]
if len(valid_dfs) == 0 and len(dfs) != 0:
valid_dfs = dfs

Check warning on line 3616 in modin/core/dataframe/pandas/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/dataframe/dataframe.py#L3616

Added line #L3616 was not covered by tests

# Using '.concat()' on empty-slices instead of 'Index.join()'
# in order to get identical behavior to pandas when it joins
# results of different groups
return pandas.concat(
[df.iloc[:0] for df in valid_dfs], axis=0, join="outer"
).columns

# Passing all partitions to the 'compute_aligned_columns' kernel to get
# aligned columns
parts = result._partitions.flatten()
aligned_columns = parts[0].apply(
compute_aligned_columns, *[part._data for part in parts[1:]]
)

# Lazily applying aligned columns to partitions
new_partitions = self._partition_mgr_cls.lazy_map_partitions(
result._partitions,
lambda df, columns: df.reindex(columns=columns),
func_args=(aligned_columns._data,),
)
else:

def join_cols(df, *cols):
"""Join `cols` and apply the joined columns to `df`."""
valid_cols = [
pandas.DataFrame(columns=col) for col in cols if col is not None
]
if len(valid_cols) == 0:
return df

Check warning on line 3646 in modin/core/dataframe/pandas/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/dataframe/dataframe.py#L3646

Added line #L3646 was not covered by tests
# Using '.concat()' on empty-slices instead of 'Index.join()'
# in order to get identical behavior to pandas when it joins
# results of different groups
result_col = pandas.concat(valid_cols, axis=0, join="outer").columns
return df.reindex(columns=result_col)

# Getting futures for columns of non-empty partitions
cols = [
part.apply(
lambda df: None
if df.attrs.get(skip_on_aligning_flag, False)
else df.columns
)._data
for part in result._partitions.flatten()
]

# Lazily joining and applying the aligned columns
new_partitions = self._partition_mgr_cls.lazy_map_partitions(
result._partitions,
join_cols,
func_args=cols,
)
result = self.__constructor__(
new_partitions,
index=result.copy_index_cache(),
row_lengths=result._row_lengths_cache,
)

if result_schema is not None:
new_dtypes = pandas.Series(result_schema)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def reduce_func(df):
# Otherwise, we get mismatching internal and external indices for both axes
intermediate_df.index = pandas.RangeIndex(1)
intermediate_df.columns = pandas.RangeIndex(1)
self._null_count_cache = intermediate_df.to_pandas().squeeze()
self._null_count_cache = intermediate_df.to_pandas().squeeze(axis=1).item()
return self._null_count_cache

@property
Expand Down Expand Up @@ -411,15 +411,18 @@ def _get_validity_buffer(self) -> Tuple[PandasProtocolBuffer, Any]:
buf = self._col.to_numpy().flatten()

# Determine the encoding for valid values
valid = 1 if invalid == 0 else 0
valid = invalid == 0
invalid = not valid

mask = [valid if type(buf[i]) is str else invalid for i in range(buf.size)]
mask = np.empty(shape=(len(buf),), dtype=np.bool_)
for i, obj in enumerate(buf):
mask[i] = valid if isinstance(obj, str) else invalid

# Convert the mask array to a Pandas "buffer" using a NumPy array as the backing store
buffer = PandasProtocolBuffer(np.asarray(mask, dtype="uint8"))
buffer = PandasProtocolBuffer(mask)

# Define the dtype of the returned buffer
dtype = (DTypeKind.UINT, 8, "C", "=")
dtype = (DTypeKind.BOOL, 8, "b", "=")

self._validity_buffer_cache = (buffer, dtype)
return self._validity_buffer_cache
Expand Down
12 changes: 10 additions & 2 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ def map_partitions(cls, partitions, map_func):

@classmethod
@wait_computations_if_benchmark_mode
def lazy_map_partitions(cls, partitions, map_func):
def lazy_map_partitions(cls, partitions, map_func, func_args=None):
"""
Apply `map_func` to every partition in `partitions` *lazily*.

Expand All @@ -540,6 +540,8 @@ def lazy_map_partitions(cls, partitions, map_func):
Partitions of Modin Frame.
map_func : callable
Function to apply.
func_args : iterable, optional
Positional arguments for the 'map_func'.

Returns
-------
Expand All @@ -549,7 +551,13 @@ def lazy_map_partitions(cls, partitions, map_func):
preprocessed_map_func = cls.preprocess_func(map_func)
return np.array(
[
[part.add_to_apply_calls(preprocessed_map_func) for part in row]
[
part.add_to_apply_calls(
preprocessed_map_func,
*(tuple() if func_args is None else func_args),
)
for part in row
]
for row in partitions
]
)
Expand Down
Loading
Loading