From ed0eba46546064162d75e8dab64ecbf14f5182c9 Mon Sep 17 00:00:00 2001 From: Andrey Pavlenko Date: Fri, 18 Aug 2023 16:22:34 +0200 Subject: [PATCH 01/32] FIX-#6479: HDK CalciteBuilder: Do not call is_bool_dtype() for categorical (#6480) Signed-off-by: Andrey Pavlenko --- .../implementations/hdk_on_native/calcite_builder.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py index bf7237d7db3..e755ef9369c 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py @@ -45,7 +45,7 @@ ) from collections import abc -from pandas.core.dtypes.common import get_dtype, is_bool_dtype +from pandas.core.dtypes.common import get_dtype, is_categorical_dtype, is_bool_dtype class CalciteBuilder: @@ -903,7 +903,10 @@ def _process_groupby(self, op): bool_cols := { c: cast_agg[agg_exprs[c].agg] for c, t in frame.dtypes.items() - if is_bool_dtype(t) and agg_exprs[c].agg in cast_agg + # Do not call is_bool_dtype() for categorical since it checks all the categories + if not is_categorical_dtype(t) + and is_bool_dtype(t) + and agg_exprs[c].agg in cast_agg } ): trans = self._input_ctx()._maybe_copy_and_translate_expr From 92da6b52dcfcfedf515814aa1ae248ad66680f03 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Sat, 26 Aug 2023 13:32:06 +0200 Subject: [PATCH 02/32] FIX-#6509: Fix 'reshuffling' in case of a string key (#6510) Signed-off-by: Dmitry Chigarev --- modin/conftest.py | 15 +++++++++++++++ .../dataframe/pandas/dataframe/dataframe.py | 6 +++--- modin/pandas/test/test_groupby.py | 17 +++++++++++++++++ .../storage_formats/pandas/test_internals.py | 15 --------------- 4 files changed, 35 insertions(+), 18 deletions(-) diff --git a/modin/conftest.py b/modin/conftest.py index 28c91547dc0..e6beb63b29b 100644 --- a/modin/conftest.py +++ b/modin/conftest.py @@ -712,3 +712,18 @@ def s3_resource(s3_base): if not cli.list_buckets()["Buckets"]: break time.sleep(0.1) + + +@pytest.fixture +def modify_config(request): + values = request.param + old_values = {} + + for key, value in values.items(): + old_values[key] = key.get() + key.put(value) + + yield # waiting for the test to be completed + # restoring old parameters + for key, value in old_values.items(): + key.put(value) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index fda5ad7b4a1..7c088a38f57 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -2311,12 +2311,12 @@ def _apply_func_to_range_partitioning( # simply combine all partitions and apply the sorting to the whole dataframe return self.combine_and_apply(func=func) - if self.dtypes[key_column] == object: + if is_numeric_dtype(self.dtypes[key_column]): + method = "linear" + else: # This means we are not sorting numbers, so we need our quantiles to not try # arithmetic on the values. method = "inverted_cdf" - else: - method = "linear" shuffling_functions = build_sort_functions( self, diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index be3eb4866d8..cb1d684d2e4 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -2783,3 +2783,20 @@ def perform(lib): return getattr(grp, func)() eval_general(pd, pandas, perform) + + +@pytest.mark.parametrize( + "modify_config", [{ExperimentalGroupbyImpl: True}], indirect=True +) +def test_reshuffling_groupby_on_strings(modify_config): + # reproducer from https://github.com/modin-project/modin/issues/6509 + modin_df, pandas_df = create_test_dfs( + {"col1": ["a"] * 50 + ["b"] * 50, "col2": range(100)} + ) + + modin_df = modin_df.astype({"col1": "string"}) + pandas_df = pandas_df.astype({"col1": "string"}) + + eval_general( + modin_df.groupby("col1"), pandas_df.groupby("col1"), lambda grp: grp.mean() + ) diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index 154a2490a5e..d852a7b1db4 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -117,21 +117,6 @@ def construct_modin_df_by_scheme(pandas_df, partitioning_scheme): return md_df -@pytest.fixture -def modify_config(request): - values = request.param - old_values = {} - - for key, value in values.items(): - old_values[key] = key.get() - key.put(value) - - yield # waiting for the test to be completed - # restoring old parameters - for key, value in old_values.items(): - key.put(value) - - def validate_partitions_cache(df): """Assert that the ``PandasDataframe`` shape caches correspond to the actual partition's shapes.""" row_lengths = df._row_lengths_cache From a6301c56821cc7d85c63f363dbf6499d4bf3e267 Mon Sep 17 00:00:00 2001 From: Andrey Pavlenko Date: Mon, 28 Aug 2023 19:12:53 +0200 Subject: [PATCH 03/32] FIX-#6514: test_sort_cols_str from test_dataframe.py crashed on HDK 0.7.0 and python 3.9 (#6515) Signed-off-by: Andrey Pavlenko --- .../hdk_on_native/test/test_dataframe.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py index 9653df62477..f85b0c99b9a 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py @@ -31,6 +31,8 @@ from .utils import eval_io, ForceHdkImport, set_execution_mode, run_and_compare from pandas.core.dtypes.common import is_list_like +from pyhdk import __version__ as hdk_version + StorageFormat.put("hdk") import modin.pandas as pd @@ -2064,19 +2066,21 @@ def test_cat_codes(self): class TestSort: + # In order for the row order to be deterministic after sorting, + # the `by` columns should not contain duplicate values. data = { - "a": [1, 2, 5, 2, 5, 4, 4, 5, 2], + "a": [1, 2, 5, -2, -5, 4, -4, 6, 3], "b": [1, 2, 3, 6, 5, 1, 4, 5, 3], "c": [5, 4, 2, 3, 1, 1, 4, 5, 6], "d": ["1", "4", "3", "2", "1", "6", "7", "5", "0"], } data_nulls = { - "a": [1, 2, 5, 2, 5, 4, 4, None, 2], + "a": [1, 2, 5, -2, -5, 4, -4, None, 3], "b": [1, 2, 3, 6, 5, None, 4, 5, 3], "c": [None, 4, 2, 3, 1, 1, 4, 5, 6], } data_multiple_nulls = { - "a": [1, 2, None, 2, 5, 4, 4, None, 2], + "a": [1, 2, None, -2, 5, 4, -4, None, 3], "b": [1, 2, 3, 6, 5, None, 4, 5, None], "c": [None, 4, 2, None, 1, 1, 4, 5, 6], } @@ -2094,6 +2098,7 @@ def test_sort_cols(self, cols, ignore_index, index_cols, ascending): def sort(df, cols, ignore_index, index_cols, ascending, **kwargs): if index_cols: df = df.set_index(index_cols) + df_equals_with_non_stable_indices() return df.sort_values(cols, ignore_index=ignore_index, ascending=ascending) run_and_compare( @@ -2119,6 +2124,10 @@ def sort(df, ascending, **kwargs): ascending=ascending, ) + @pytest.mark.skipif( + hdk_version == "0.7.0", + reason="https://github.com/modin-project/modin/issues/6514", + ) @pytest.mark.parametrize("ascending", ascending_values) def test_sort_cols_str(self, ascending): def sort(df, ascending, **kwargs): From 19b3696da7ab9a2e70f2bef547b9877dbd35c05d Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Mon, 28 Aug 2023 23:07:52 +0200 Subject: [PATCH 04/32] FIX-#6465: Fix `groupby.apply()` for UDFs that change the output's shape (#6506) Signed-off-by: Dmitry Chigarev --- modin/conftest.py | 12 +- .../dataframe/pandas/dataframe/dataframe.py | 108 +++++++++++++++++- .../pandas/partitioning/partition_manager.py | 12 +- .../storage_formats/pandas/query_compiler.py | 4 + modin/pandas/test/test_groupby.py | 70 +++++++++++- 5 files changed, 200 insertions(+), 6 deletions(-) diff --git a/modin/conftest.py b/modin/conftest.py index e6beb63b29b..8bd218c4df6 100644 --- a/modin/conftest.py +++ b/modin/conftest.py @@ -726,4 +726,14 @@ def modify_config(request): yield # waiting for the test to be completed # restoring old parameters for key, value in old_values.items(): - key.put(value) + try: + key.put(value) + except ValueError as e: + # sometimes bool env variables have 'None' as a default value, which + # causes a ValueError when we try to set this value back, as technically, + # only bool values are allowed (and 'None' is not a bool), in this case + # we try to set 'False' instead + if key.type == bool and value is None: + key.put(False) + else: + raise e diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 7c088a38f57..7754e5df1f2 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -31,7 +31,7 @@ from pandas._libs.lib import no_default from typing import List, Hashable, Optional, Callable, Union, Dict, TYPE_CHECKING -from modin.config import Engine +from modin.config import Engine, IsRayCluster from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler from modin.core.storage_formats.pandas.utils import get_length_list from modin.error_message import ErrorMessage @@ -3495,6 +3495,7 @@ def groupby( by: Union[str, List[str]], operator: Callable, result_schema: Optional[Dict[Hashable, type]] = None, + align_result_columns=False, **kwargs: dict, ) -> "PandasDataframe": """ @@ -3512,6 +3513,10 @@ def groupby( on the output desired by the user. result_schema : dict, optional Mapping from column labels to data types that represents the types of the output dataframe. + align_result_columns : bool, default: False + Whether to manually align columns between all the resulted row partitions. + This flag is helpful when dealing with UDFs as they can change the partition's shape + and labeling unpredictably, resulting in an invalid dataframe. **kwargs : dict Additional arguments to pass to the ``df.groupby`` method (besides the 'by' argument). @@ -3541,19 +3546,118 @@ def groupby( if not isinstance(by, list): by = [by] + skip_on_aligning_flag = "__skip_me_on_aligning__" + def apply_func(df): # pragma: no cover if any(is_categorical_dtype(dtype) for dtype in df.dtypes[by].values): raise NotImplementedError( "Reshuffling groupby is not yet supported when grouping on a categorical column. " + "https://github.com/modin-project/modin/issues/5925" ) - return operator(df.groupby(by, **kwargs)) + result = operator(df.groupby(by, **kwargs)) + if ( + align_result_columns + and df.empty + and result.empty + and df.columns.equals(result.columns) + ): + # We want to align columns only of those frames that actually performed + # some groupby aggregation, if an empty frame was originally passed + # (an empty bin on reshuffling was created) then there were no groupby + # executed over this partition and so it has incorrect columns + # that shouldn't be considered on the aligning phase + result.attrs[skip_on_aligning_flag] = True + return result result = self._apply_func_to_range_partitioning( key_column=by[0], func=apply_func, ) + # no need aligning columns if there's only one row partition + if align_result_columns and result._partitions.shape[0] > 1: + # FIXME: the current reshuffling implementation guarantees us that there's only one column + # partition in the result, so we should never hit this exception for now, however + # in the future, we might want to make this implementation more broader + if result._partitions.shape[1] > 1: + raise NotImplementedError( + "Aligning columns is not yet implemented for multiple column partitions." + ) + + # There're two implementations: + # 1. The first one work faster, but may stress the network a lot in cluster mode since + # it gathers all the dataframes in a single ray-kernel. + # 2. The second one works slower, but only gathers light pandas.Index objects, + # so there should be less stress on the network. + if not IsRayCluster.get(): + + def compute_aligned_columns(*dfs): + """Take row partitions, filter empty ones, and return joined columns for them.""" + valid_dfs = [ + df + for df in dfs + if not df.attrs.get(skip_on_aligning_flag, False) + ] + if len(valid_dfs) == 0 and len(dfs) != 0: + valid_dfs = dfs + + # Using '.concat()' on empty-slices instead of 'Index.join()' + # in order to get identical behavior to pandas when it joins + # results of different groups + return pandas.concat( + [df.iloc[:0] for df in valid_dfs], axis=0, join="outer" + ).columns + + # Passing all partitions to the 'compute_aligned_columns' kernel to get + # aligned columns + parts = result._partitions.flatten() + aligned_columns = parts[0].apply( + compute_aligned_columns, *[part._data for part in parts[1:]] + ) + + # Lazily applying aligned columns to partitions + new_partitions = self._partition_mgr_cls.lazy_map_partitions( + result._partitions, + lambda df, columns: df.reindex(columns=columns), + func_args=(aligned_columns._data,), + ) + else: + + def join_cols(df, *cols): + """Join `cols` and apply the joined columns to `df`.""" + valid_cols = [ + pandas.DataFrame(columns=col) for col in cols if col is not None + ] + if len(valid_cols) == 0: + return df + # Using '.concat()' on empty-slices instead of 'Index.join()' + # in order to get identical behavior to pandas when it joins + # results of different groups + result_col = pandas.concat(valid_cols, axis=0, join="outer").columns + return df.reindex(columns=result_col) + + # Getting futures for columns of non-empty partitions + cols = [ + part.apply( + lambda df: None + if df.attrs.get(skip_on_aligning_flag, False) + else df.columns + )._data + for part in result._partitions.flatten() + ] + + # Lazily joining and applying the aligned columns + new_partitions = self._partition_mgr_cls.lazy_map_partitions( + result._partitions, + join_cols, + func_args=cols, + ) + result = self.__constructor__( + new_partitions, + index=result.copy_index_cache(), + row_lengths=result._row_lengths_cache, + ) + if result_schema is not None: new_dtypes = pandas.Series(result_schema) diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 7dce9702f22..b7a4652b558 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -530,7 +530,7 @@ def map_partitions(cls, partitions, map_func): @classmethod @wait_computations_if_benchmark_mode - def lazy_map_partitions(cls, partitions, map_func): + def lazy_map_partitions(cls, partitions, map_func, func_args=None): """ Apply `map_func` to every partition in `partitions` *lazily*. @@ -540,6 +540,8 @@ def lazy_map_partitions(cls, partitions, map_func): Partitions of Modin Frame. map_func : callable Function to apply. + func_args : iterable, optional + Positional arguments for the 'map_func'. Returns ------- @@ -549,7 +551,13 @@ def lazy_map_partitions(cls, partitions, map_func): preprocessed_map_func = cls.preprocess_func(map_func) return np.array( [ - [part.add_to_apply_calls(preprocessed_map_func) for part in row] + [ + part.add_to_apply_calls( + preprocessed_map_func, + *(tuple() if func_args is None else func_args), + ) + for part in row + ] for row in partitions ] ) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 5787c3071b8..7ae766f9fc4 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -3488,6 +3488,10 @@ def agg_func(grp, *args, **kwargs): axis=axis, by=by, operator=lambda grp: agg_func(grp, *agg_args, **agg_kwargs), + # UDFs passed to '.apply()' are allowed to produce results with arbitrary shapes, + # that's why we have to align the partition's shapes/labeling across different + # row partitions + align_result_columns=how == "group_wise", **groupby_kwargs, ) result_qc = self.__constructor__(result) diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index cb1d684d2e4..af42a12bc64 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -19,7 +19,7 @@ import datetime from modin.config import StorageFormat -from modin.config.envvars import ExperimentalGroupbyImpl +from modin.config.envvars import ExperimentalGroupbyImpl, IsRayCluster from modin.core.dataframe.pandas.partitioning.axis_partition import ( PandasDataframeAxisPartition, ) @@ -2785,6 +2785,74 @@ def perform(lib): eval_general(pd, pandas, perform) +# there are two different implementations of partitions aligning for cluster and non-cluster mode, +# here we want to test both of them, so simply modifying the config for this test +@pytest.mark.parametrize( + "modify_config", + [ + {ExperimentalGroupbyImpl: True, IsRayCluster: True}, + {ExperimentalGroupbyImpl: True, IsRayCluster: False}, + ], + indirect=True, +) +def test_shape_changing_udf(modify_config): + modin_df, pandas_df = create_test_dfs( + { + "by_col1": ([1] * 50) + ([10] * 50), + "col2": np.arange(100), + "col3": np.arange(100), + } + ) + + def func1(group): + # changes the original shape and indexing of the 'group' + return pandas.Series( + [1, 2, 3, 4], index=["new_col1", "new_col2", "new_col4", "new_col3"] + ) + + eval_general( + modin_df.groupby("by_col1"), + pandas_df.groupby("by_col1"), + lambda df: df.apply(func1), + ) + + def func2(group): + # each group have different shape at the end + # (we do .to_frame().T as otherwise this scenario doesn't work in pandas) + if group.iloc[0, 0] == 1: + return ( + pandas.Series( + [1, 2, 3, 4], index=["new_col1", "new_col2", "new_col4", "new_col3"] + ) + .to_frame() + .T + ) + return ( + pandas.Series([20, 33, 44], index=["new_col2", "new_col3", "new_col4"]) + .to_frame() + .T + ) + + eval_general( + modin_df.groupby("by_col1"), + pandas_df.groupby("by_col1"), + lambda df: df.apply(func2), + ) + + def func3(group): + # one of the groups produce an empty dataframe, in the result we should + # have joined columns of both of these dataframes + if group.iloc[0, 0] == 1: + return pandas.DataFrame([[1, 2, 3]], index=["col1", "col2", "col3"]) + return pandas.DataFrame(columns=["col2", "col3", "col4", "col5"]) + + eval_general( + modin_df.groupby("by_col1"), + pandas_df.groupby("by_col1"), + lambda df: df.apply(func3), + ) + + @pytest.mark.parametrize( "modify_config", [{ExperimentalGroupbyImpl: True}], indirect=True ) From 77f75b318077f416c2d9a5f0fce12f59601785d0 Mon Sep 17 00:00:00 2001 From: Andrey Pavlenko Date: Tue, 29 Aug 2023 16:52:17 +0200 Subject: [PATCH 05/32] FIX-#6516: HDK: test_dataframe.py is crashed if Calcite is disabled (#6517) Signed-off-by: Andrey Pavlenko Co-authored-by: Anatoly Myachev --- .../implementations/hdk_on_native/calcite_algebra.py | 11 ++++++++--- .../implementations/hdk_on_native/calcite_builder.py | 6 ++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_algebra.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_algebra.py index 8929890d11c..633878c2e25 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_algebra.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_algebra.py @@ -146,14 +146,19 @@ def __init__(self, relOp): self.relOp = relOp @classmethod - def reset_id(cls): + def reset_id(cls, next_id=0): """ - Reset ID to be used for the next new node to 0. + Reset ID to be used for the next new node to `next_id`. Can be used to have a zero-based numbering for each generated query. + + Parameters + ---------- + next_id : int, default: 0 + Next node id. """ - cls._next_id[0] = 0 + cls._next_id[0] = next_id class CalciteScanNode(CalciteBaseNode): diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py index e755ef9369c..ecb3bb1d97d 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py @@ -752,8 +752,10 @@ def _push(self, node): and all(isinstance(expr, CalciteInputRefExpr) for expr in node.exprs) ): # Replace the last CalciteProjectionNode with this one and - # translate the input refs. - exprs = self.res.pop().exprs + # translate the input refs. The `id` attribute is preserved. + last = self.res.pop() + exprs = last.exprs + last.reset_id(int(last.id)) node = CalciteProjectionNode( node.fields, [exprs[expr.input] for expr in node.exprs] ) From fd174ec1010938b6661cc6717fc0bdb87b484ce8 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Wed, 30 Aug 2023 22:24:48 +0200 Subject: [PATCH 06/32] FIX-#6519: consider 'botocore' as an optional dependency (#6521) Signed-off-by: Anatoly Myachev --- modin/core/io/file_dispatcher.py | 29 +++++----- .../core/io/text/csv_glob_dispatcher.py | 54 +++++++++++-------- 2 files changed, 49 insertions(+), 34 deletions(-) diff --git a/modin/core/io/file_dispatcher.py b/modin/core/io/file_dispatcher.py index e201fd6b324..0d0cd2a0a7f 100644 --- a/modin/core/io/file_dispatcher.py +++ b/modin/core/io/file_dispatcher.py @@ -89,7 +89,7 @@ def __enter__(self): PermissionError, ) except ModuleNotFoundError: - credential_error_type = () + credential_error_type = (PermissionError,) args = (self.file_path, self.mode, self.compression) @@ -257,11 +257,21 @@ def file_exists(cls, file_path, storage_options=None): if not is_fsspec_url(file_path) and not is_url(file_path): return os.path.exists(file_path) - from botocore.exceptions import ( - NoCredentialsError, - EndpointConnectionError, - ConnectTimeoutError, - ) + try: + from botocore.exceptions import ( + NoCredentialsError, + EndpointConnectionError, + ConnectTimeoutError, + ) + + credential_error_type = ( + NoCredentialsError, + PermissionError, + EndpointConnectionError, + ConnectTimeoutError, + ) + except ModuleNotFoundError: + credential_error_type = (PermissionError,) if storage_options is not None: new_storage_options = dict(storage_options) @@ -273,12 +283,7 @@ def file_exists(cls, file_path, storage_options=None): exists = False try: exists = fs.exists(file_path) - except ( - NoCredentialsError, - PermissionError, - EndpointConnectionError, - ConnectTimeoutError, - ): + except credential_error_type: fs, _ = fsspec.core.url_to_fs(file_path, anon=True, **new_storage_options) exists = fs.exists(file_path) diff --git a/modin/experimental/core/io/text/csv_glob_dispatcher.py b/modin/experimental/core/io/text/csv_glob_dispatcher.py index 88531efe43c..77771bcea60 100644 --- a/modin/experimental/core/io/text/csv_glob_dispatcher.py +++ b/modin/experimental/core/io/text/csv_glob_dispatcher.py @@ -282,11 +282,21 @@ def file_exists(cls, file_path: str, storage_options=None) -> bool: if not is_fsspec_url(file_path): return len(glob.glob(file_path)) > 0 - from botocore.exceptions import ( - NoCredentialsError, - EndpointConnectionError, - ConnectTimeoutError, - ) + try: + from botocore.exceptions import ( + NoCredentialsError, + EndpointConnectionError, + ConnectTimeoutError, + ) + + credential_error_type = ( + NoCredentialsError, + PermissionError, + EndpointConnectionError, + ConnectTimeoutError, + ) + except ModuleNotFoundError: + credential_error_type = (PermissionError,) if storage_options is not None: new_storage_options = dict(storage_options) @@ -298,12 +308,7 @@ def file_exists(cls, file_path: str, storage_options=None) -> bool: exists = False try: exists = fs.exists(file_path) - except ( - NoCredentialsError, - PermissionError, - EndpointConnectionError, - ConnectTimeoutError, - ): + except credential_error_type: fs, _ = fsspec.core.url_to_fs(file_path, anon=True, **new_storage_options) exists = fs.exists(file_path) return exists or len(fs.glob(file_path)) > 0 @@ -328,11 +333,21 @@ def get_path(cls, file_path: str) -> list: abs_paths = [os.path.abspath(path) for path in relative_paths] return abs_paths - from botocore.exceptions import ( - NoCredentialsError, - EndpointConnectionError, - ConnectTimeoutError, - ) + try: + from botocore.exceptions import ( + NoCredentialsError, + EndpointConnectionError, + ConnectTimeoutError, + ) + + credential_error_type = ( + NoCredentialsError, + PermissionError, + EndpointConnectionError, + ConnectTimeoutError, + ) + except ModuleNotFoundError: + credential_error_type = (PermissionError,) def get_file_path(fs_handle) -> List[str]: file_paths = fs_handle.glob(file_path) @@ -344,12 +359,7 @@ def get_file_path(fs_handle) -> List[str]: fs, _ = fsspec.core.url_to_fs(file_path) try: return get_file_path(fs) - except ( - NoCredentialsError, - PermissionError, - EndpointConnectionError, - ConnectTimeoutError, - ): + except credential_error_type: fs, _ = fsspec.core.url_to_fs(file_path, anon=True) return get_file_path(fs) From fcc404a0db155359ba2bcd070feb0d80cb4b7ad2 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Thu, 31 Aug 2023 09:04:29 +0200 Subject: [PATCH 07/32] FIX-#4347: read_excel: defaults to pandas for unsupported types of 'io' (#6462) Signed-off-by: Anatoly Myachev --- modin/core/io/io.py | 7 ++++ modin/core/io/text/excel_dispatcher.py | 26 +++++++++++-- modin/pandas/io.py | 13 ++++++- modin/pandas/test/test_io.py | 54 +++++++++++++++++++++++--- 4 files changed, 90 insertions(+), 10 deletions(-) diff --git a/modin/core/io/io.py b/modin/core/io/io.py index f79ce04f9b2..30c0e186661 100644 --- a/modin/core/io/io.py +++ b/modin/core/io/io.py @@ -28,6 +28,7 @@ from modin.error_message import ErrorMessage from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler from modin.utils import _inherit_docstrings +from modin.pandas.io import ExcelFile _doc_default_io_method = """ {summary} using pandas. @@ -277,6 +278,12 @@ def read_clipboard(cls, sep=r"\s+", **kwargs): # pragma: no cover # noqa: PR01 ) def read_excel(cls, **kwargs): # noqa: PR01 ErrorMessage.default_to_pandas("`read_excel`") + if isinstance(kwargs["io"], ExcelFile): + # otherwise, Modin objects may be passed to the pandas context, resulting + # in undefined behavior + # for example in the case: pd.read_excel(pd.ExcelFile), since reading from + # pd.ExcelFile in `read_excel` isn't supported + kwargs["io"]._set_pandas_mode() intermediate = pandas.read_excel(**kwargs) if isinstance(intermediate, (OrderedDict, dict)): parsed = type(intermediate)() diff --git a/modin/core/io/text/excel_dispatcher.py b/modin/core/io/text/excel_dispatcher.py index 4fd40b6424f..b9235abcae6 100644 --- a/modin/core/io/text/excel_dispatcher.py +++ b/modin/core/io/text/excel_dispatcher.py @@ -13,12 +13,16 @@ """Module houses `ExcelDispatcher` class, that is used for reading excel files.""" +import os +from io import BytesIO + import pandas import re import warnings from modin.core.io.text.text_file_dispatcher import TextFileDispatcher from modin.config import NPartitions +from modin.pandas.io import ExcelFile EXCEL_READ_BLOCK_SIZE = 4096 @@ -55,6 +59,22 @@ def _read(cls, io, **kwargs): **kwargs ) + if isinstance(io, bytes): + io = BytesIO(io) + + # isinstance(ExcelFile, os.PathLike) == True + if not isinstance(io, (str, os.PathLike, BytesIO)) or isinstance( + io, (ExcelFile, pandas.ExcelFile) + ): + if isinstance(io, ExcelFile): + io._set_pandas_mode() + return cls.single_worker_read( + io, + reason="Modin only implements parallel `read_excel` the following types of `io`: " + + "str, os.PathLike, io.BytesIO.", + **kwargs + ) + from zipfile import ZipFile from openpyxl.worksheet.worksheet import Worksheet from openpyxl.worksheet._reader import WorksheetReader @@ -79,7 +99,7 @@ def _read(cls, io, **kwargs): # NOTE: ExcelReader() in read-only mode does not close file handle by itself # work around that by passing file object if we received some path - io_file = open(io, "rb") if isinstance(io, str) else io + io_file = open(io, "rb") if isinstance(io, (str, os.PathLike)) else io try: ex = ExcelReader(io_file, read_only=True) ex.read() @@ -90,14 +110,12 @@ def _read(cls, io, **kwargs): ex.read_strings() ws = Worksheet(wb) finally: - if isinstance(io, str): + if isinstance(io, (str, os.PathLike)): # close only if it were us who opened the object io_file.close() pandas_kw = dict(kwargs) # preserve original kwargs with ZipFile(io) as z: - from io import BytesIO - # Convert index to sheet name in file if isinstance(sheet_name, int): sheet_name = "sheet{}".format(sheet_name + 1) diff --git a/modin/pandas/io.py b/modin/pandas/io.py index 8373305d258..62fa75390ef 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -835,10 +835,21 @@ class ExcelFile(ClassLogger, pandas.ExcelFile): # noqa: PR01, D200 Class for parsing tabular excel sheets into DataFrame objects. """ + _behave_like_pandas = False + + def _set_pandas_mode(self): # noqa + # disable Modin behavior to be able to pass object to `pandas.read_excel` + # otherwise, Modin objects may be passed to the pandas context, resulting + # in undefined behavior + self._behave_like_pandas = True + def __getattribute__(self, item): + if item in ["_set_pandas_mode", "_behave_like_pandas"]: + return object.__getattribute__(self, item) + default_behaviors = ["__init__", "__class__"] method = super(ExcelFile, self).__getattribute__(item) - if item not in default_behaviors: + if not self._behave_like_pandas and item not in default_behaviors: if callable(method): def return_handler(*args, **kwargs): diff --git a/modin/pandas/test/test_io.py b/modin/pandas/test/test_io.py index 0649f604746..f6c76141815 100644 --- a/modin/pandas/test/test_io.py +++ b/modin/pandas/test/test_io.py @@ -43,6 +43,7 @@ from modin.test.test_utils import warns_that_defaulting_to_pandas import pyarrow as pa import os +from io import BytesIO, StringIO from scipy import sparse import sys import sqlalchemy as sa @@ -1005,8 +1006,6 @@ def test_read_csv_default_to_pandas(self): warning_suffix = "" with warns_that_defaulting_to_pandas(suffix=warning_suffix): # This tests that we default to pandas on a buffer - from io import StringIO - with open(pytest.csvs_names["test_read_csv_regular"], "r") as _f: pd.read_csv(StringIO(_f.read())) @@ -1880,11 +1879,13 @@ def test_read_json_metadata(self, make_json_file): @pytest.mark.filterwarnings(default_to_pandas_ignore_string) class TestExcel: @check_file_leaks - def test_read_excel(self, make_excel_file): + @pytest.mark.parametrize("pathlike", [False, True]) + def test_read_excel(self, pathlike, make_excel_file): + unique_filename = make_excel_file() eval_io( fn_name="read_excel", # read_excel kwargs - io=make_excel_file(), + io=Path(unique_filename) if pathlike else unique_filename, ) @check_file_leaks @@ -2021,11 +2022,54 @@ def test_ExcelFile(self, make_excel_file): try: df_equals(modin_excel_file.parse(), pandas_excel_file.parse()) assert modin_excel_file.io == unique_filename - assert isinstance(modin_excel_file, pd.ExcelFile) finally: modin_excel_file.close() pandas_excel_file.close() + def test_ExcelFile_bytes(self, make_excel_file): + unique_filename = make_excel_file() + with open(unique_filename, mode="rb") as f: + content = f.read() + + modin_excel_file = pd.ExcelFile(content) + pandas_excel_file = pandas.ExcelFile(content) + + df_equals(modin_excel_file.parse(), pandas_excel_file.parse()) + + def test_read_excel_ExcelFile(self, make_excel_file): + unique_filename = make_excel_file() + with open(unique_filename, mode="rb") as f: + content = f.read() + + modin_excel_file = pd.ExcelFile(content) + pandas_excel_file = pandas.ExcelFile(content) + + df_equals(pd.read_excel(modin_excel_file), pandas.read_excel(pandas_excel_file)) + + @pytest.mark.parametrize("use_bytes_io", [False, True]) + def test_read_excel_bytes(self, use_bytes_io, make_excel_file): + unique_filename = make_excel_file() + with open(unique_filename, mode="rb") as f: + io_bytes = f.read() + + if use_bytes_io: + io_bytes = BytesIO(io_bytes) + + eval_io( + fn_name="read_excel", + # read_excel kwargs + io=io_bytes, + ) + + def test_read_excel_file_handle(self, make_excel_file): + unique_filename = make_excel_file() + with open(unique_filename, mode="rb") as f: + eval_io( + fn_name="read_excel", + # read_excel kwargs + io=f, + ) + @pytest.mark.xfail(strict=False, reason="Flaky test, defaults to pandas") def test_to_excel(self, tmp_path): modin_df, pandas_df = create_test_dfs(TEST_DATA) From afbf46649c07fb67aa2fe096e64f2df5ec48b88b Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Thu, 31 Aug 2023 19:07:12 +0200 Subject: [PATCH 08/32] FIX-#6518: fix interchange protocol for string columns (#6523) Signed-off-by: Anatoly Myachev Co-authored-by: Vasily Litvinov --- .../pandas/interchange/dataframe_protocol/column.py | 11 +++++++---- .../dataframe_protocol/pandas/test_protocol.py | 8 ++++++++ 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py b/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py index 28aff46a7b7..01471ad45f3 100644 --- a/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py +++ b/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py @@ -411,15 +411,18 @@ def _get_validity_buffer(self) -> Tuple[PandasProtocolBuffer, Any]: buf = self._col.to_numpy().flatten() # Determine the encoding for valid values - valid = 1 if invalid == 0 else 0 + valid = invalid == 0 + invalid = not valid - mask = [valid if type(buf[i]) is str else invalid for i in range(buf.size)] + mask = np.empty(shape=(len(buf),), dtype=np.bool_) + for i, obj in enumerate(buf): + mask[i] = valid if isinstance(obj, str) else invalid # Convert the mask array to a Pandas "buffer" using a NumPy array as the backing store - buffer = PandasProtocolBuffer(np.asarray(mask, dtype="uint8")) + buffer = PandasProtocolBuffer(mask) # Define the dtype of the returned buffer - dtype = (DTypeKind.UINT, 8, "C", "=") + dtype = (DTypeKind.BOOL, 8, "b", "=") self._validity_buffer_cache = (buffer, dtype) return self._validity_buffer_cache diff --git a/modin/test/interchange/dataframe_protocol/pandas/test_protocol.py b/modin/test/interchange/dataframe_protocol/pandas/test_protocol.py index b723ad5884b..356b9bec24d 100644 --- a/modin/test/interchange/dataframe_protocol/pandas/test_protocol.py +++ b/modin/test/interchange/dataframe_protocol/pandas/test_protocol.py @@ -13,6 +13,8 @@ """Dataframe exchange protocol tests that are specific for pandas storage format implementation.""" +import pandas + import modin.pandas as pd from modin.pandas.utils import from_dataframe from modin.pandas.test.utils import df_equals, test_data @@ -54,3 +56,9 @@ def test_categorical_from_dataframe(): {"foo": pd.Series(["0", "1", "2", "3", "0", "3", "2", "3"], dtype="category")} ) eval_df_protocol(modin_df) + + +def test_interchange_with_pandas_string(): + modin_df = pd.DataFrame({"fips": ["01001"]}) + pandas_df = pandas.api.interchange.from_dataframe(modin_df.__dataframe__()) + df_equals(modin_df, pandas_df) From 8808bd5b68d5c3844a287989daa270d96967261d Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 1 Sep 2023 16:45:50 +0200 Subject: [PATCH 09/32] FIX-#5536: remove branch disabling '__getattribute__' for experimental mode (#6529) Signed-off-by: Anatoly Myachev --- modin/pandas/groupby.py | 35 +++++++++++------------------------ 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 0732d43db54..6a09de684b9 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -37,7 +37,6 @@ from modin.pandas.utils import cast_function_modin2pandas from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler from modin.core.dataframe.algebra.default2pandas.groupby import GroupBy -from modin.config import IsExperimental from .series import Series from .window import RollingGroupby from .utils import is_label @@ -182,31 +181,19 @@ def __getattr__(self, key): return self.__getitem__(key) raise err - # TODO: `.__getattribute__` overriding is broken in experimental mode. We should - # remove this branching one it's fixed: - # https://github.com/modin-project/modin/issues/5536 - if not IsExperimental.get(): + def __getattribute__(self, item): + attr = super().__getattribute__(item) + if item not in _DEFAULT_BEHAVIOUR and not self._query_compiler.lazy_execution: + # We default to pandas on empty DataFrames. This avoids a large amount of + # pain in underlying implementation and returns a result immediately rather + # than dealing with the edge cases that empty DataFrames have. + if callable(attr) and self._df.empty and hasattr(self._pandas_class, item): - def __getattribute__(self, item): - attr = super().__getattribute__(item) - if ( - item not in _DEFAULT_BEHAVIOUR - and not self._query_compiler.lazy_execution - ): - # We default to pandas on empty DataFrames. This avoids a large amount of - # pain in underlying implementation and returns a result immediately rather - # than dealing with the edge cases that empty DataFrames have. - if ( - callable(attr) - and self._df.empty - and hasattr(self._pandas_class, item) - ): - - def default_handler(*args, **kwargs): - return self._default_to_pandas(item, *args, **kwargs) + def default_handler(*args, **kwargs): + return self._default_to_pandas(item, *args, **kwargs) - return default_handler - return attr + return default_handler + return attr @property def ngroups(self): From fb43709b24d56dc7e26c89f25cf081cbe2496239 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Tue, 5 Sep 2023 09:05:07 +0200 Subject: [PATCH 10/32] FIX-#4687: change 'Column.null_count' to return a builtin int instead of NumPy scalar (#6526) Signed-off-by: Anatoly Myachev --- .../pandas/interchange/dataframe_protocol/column.py | 2 +- modin/test/interchange/dataframe_protocol/test_general.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py b/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py index 01471ad45f3..08185f38485 100644 --- a/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py +++ b/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py @@ -251,7 +251,7 @@ def reduce_func(df): # Otherwise, we get mismatching internal and external indices for both axes intermediate_df.index = pandas.RangeIndex(1) intermediate_df.columns = pandas.RangeIndex(1) - self._null_count_cache = intermediate_df.to_pandas().squeeze() + self._null_count_cache = intermediate_df.to_pandas().squeeze(axis=1).item() return self._null_count_cache @property diff --git a/modin/test/interchange/dataframe_protocol/test_general.py b/modin/test/interchange/dataframe_protocol/test_general.py index 5a1aa6f9236..b4373835431 100644 --- a/modin/test/interchange/dataframe_protocol/test_general.py +++ b/modin/test/interchange/dataframe_protocol/test_general.py @@ -82,6 +82,14 @@ def test_na_float(df_from_dict): assert colX.null_count == 1 +def test_null_count(df_from_dict): + df = df_from_dict({"foo": [42]}) + dfX = df.__dataframe__() + colX = dfX.get_column_by_name("foo") + null_count = colX.null_count + assert null_count == 0 and type(null_count) is int + + def test_noncategorical(df_from_dict): df = df_from_dict({"a": [1, 2, 3]}) dfX = df.__dataframe__() From be29452f0021addbd5abd8d861579f413877f5b9 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Tue, 5 Sep 2023 17:20:24 +0200 Subject: [PATCH 11/32] FIX-#6535: Pin 's3fs<2023.9.0' (#6536) Signed-off-by: Dmitry Chigarev --- environment-dev.yml | 2 +- requirements-dev.txt | 2 +- requirements/env_hdk.yml | 2 +- requirements/env_unidist.yml | 2 +- requirements/requirements-no-engine.yml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/environment-dev.yml b/environment-dev.yml index 68eb42156e1..168eb24ccc6 100644 --- a/environment-dev.yml +++ b/environment-dev.yml @@ -26,7 +26,7 @@ dependencies: # - xarray - Jinja2>=3.0.0 - scipy>=1.7.1 - - s3fs>=2021.8 + - s3fs>=2021.8,<2023.9.0 - lxml>=4.6.3 - openpyxl>=3.0.7 - xlrd>=2.0.1 diff --git a/requirements-dev.txt b/requirements-dev.txt index a6b70f78190..1ab23f7f2cf 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -16,7 +16,7 @@ distributed>=2.22.0 xarray Jinja2>=3.0.0 scipy>=1.7.1 -s3fs>=2021.8 +s3fs>=2021.8,<2023.9.0 lxml>=4.6.3 openpyxl>=3.0.7 xlrd>=2.0.1 diff --git a/requirements/env_hdk.yml b/requirements/env_hdk.yml index 28bf4e6951b..781c3d5b1be 100644 --- a/requirements/env_hdk.yml +++ b/requirements/env_hdk.yml @@ -13,7 +13,7 @@ dependencies: - psutil>=5.8.0 # optional dependencies - - s3fs>=2021.8 + - s3fs>=2021.8,<2023.9.0 - openpyxl>=3.0.10 - xlrd>=2.0.1 - sqlalchemy>=1.4.0,<1.4.46 diff --git a/requirements/env_unidist.yml b/requirements/env_unidist.yml index eb5d098e06b..b28ea2764ff 100644 --- a/requirements/env_unidist.yml +++ b/requirements/env_unidist.yml @@ -18,7 +18,7 @@ dependencies: # - xarray - Jinja2>=3.0.0 - scipy>=1.7.1 - - s3fs>=2021.8 + - s3fs>=2021.8,<2023.9.0 - lxml>=4.6.3 - openpyxl>=3.0.7 - xlrd>=2.0.1 diff --git a/requirements/requirements-no-engine.yml b/requirements/requirements-no-engine.yml index bb6e2597e58..39f783bb1ce 100644 --- a/requirements/requirements-no-engine.yml +++ b/requirements/requirements-no-engine.yml @@ -16,7 +16,7 @@ dependencies: # - xarray - Jinja2>=3.0.0 - scipy>=1.7.1 - - s3fs>=2021.8 + - s3fs>=2021.8,<2023.9.0 - lxml>=4.6.3 - openpyxl>=3.0.7 - xlrd>=2.0.1 From 75d4aab526d334b44dc7a9934a26079b6b6b8774 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Wed, 6 Sep 2023 09:11:11 +0200 Subject: [PATCH 12/32] FIX-#6532: fix 'read_excel' so that it doesn't use 'rich_text' param for old 'openpyxl' (#6534) Signed-off-by: Anatoly Myachev --- modin/core/storage_formats/pandas/parsers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/core/storage_formats/pandas/parsers.py b/modin/core/storage_formats/pandas/parsers.py index 3305eeb213a..384f991af1f 100644 --- a/modin/core/storage_formats/pandas/parsers.py +++ b/modin/core/storage_formats/pandas/parsers.py @@ -651,7 +651,7 @@ def update_row_nums(match): bytesio = BytesIO(excel_header + bytes_data + footer) # Use openpyxl to read/parse sheet data common_args = (ws, bytesio, ex.shared_strings, False) - if PandasExcelParser.need_rich_text_param: + if PandasExcelParser.need_rich_text_param(): reader = WorksheetReader(*common_args, rich_text=False) else: reader = WorksheetReader(*common_args) From f09385ed4b08db332fe300e3a6d63b8f4d4e2f5e Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 8 Sep 2023 12:43:01 +0200 Subject: [PATCH 13/32] FIX-#6541: fix 'ValueError: buffer source array is read-only' for 'iloc' (#6538) Signed-off-by: Anatoly Myachev --- modin/core/storage_formats/pandas/query_compiler.py | 11 ++++++++++- modin/numpy/test/utils.py | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 7ae766f9fc4..cc9c90165d1 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -4129,7 +4129,16 @@ def iloc_mut(partition, row_internal_indices, col_internal_indices, item): Partition data with updated values. """ partition = partition.copy() - partition.iloc[row_internal_indices, col_internal_indices] = item + try: + partition.iloc[row_internal_indices, col_internal_indices] = item + except ValueError: + # `copy` is needed to avoid "ValueError: buffer source array is read-only" for `item` + # because the item may be converted to the type that is in the dataframe. + # TODO: in the future we will need to convert to the correct type manually according + # to the following warning. Example: "FutureWarning: Setting an item of incompatible + # dtype is deprecated and will raise in a future error of pandas. Value '[1.38629436]' + # has dtype incompatible with int64, please explicitly cast to a compatible dtype first." + partition.iloc[row_internal_indices, col_internal_indices] = item.copy() return partition new_modin_frame = self._modin_frame.apply_select_indices( diff --git a/modin/numpy/test/utils.py b/modin/numpy/test/utils.py index 5e378f4ede7..8b32ae6dac4 100644 --- a/modin/numpy/test/utils.py +++ b/modin/numpy/test/utils.py @@ -16,7 +16,7 @@ import modin.numpy as np -def assert_scalar_or_array_equal(x1, x2, err_msg=None): +def assert_scalar_or_array_equal(x1, x2, err_msg=""): """ Assert whether the result of the numpy and modin computations are the same. From c19f73e22069d4f4c456f704b1d57dcd2dd67fe0 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Mon, 11 Sep 2023 14:32:04 +0200 Subject: [PATCH 14/32] FIX-#6537: Unpin 's3fs<2023.9.0' (#6544) Signed-off-by: Anatoly Myachev --- environment-dev.yml | 2 +- modin/experimental/core/io/text/csv_glob_dispatcher.py | 9 ++++++++- requirements-dev.txt | 2 +- requirements/env_hdk.yml | 2 +- requirements/env_unidist.yml | 2 +- requirements/requirements-no-engine.yml | 2 +- 6 files changed, 13 insertions(+), 6 deletions(-) diff --git a/environment-dev.yml b/environment-dev.yml index 168eb24ccc6..68eb42156e1 100644 --- a/environment-dev.yml +++ b/environment-dev.yml @@ -26,7 +26,7 @@ dependencies: # - xarray - Jinja2>=3.0.0 - scipy>=1.7.1 - - s3fs>=2021.8,<2023.9.0 + - s3fs>=2021.8 - lxml>=4.6.3 - openpyxl>=3.0.7 - xlrd>=2.0.1 diff --git a/modin/experimental/core/io/text/csv_glob_dispatcher.py b/modin/experimental/core/io/text/csv_glob_dispatcher.py index 77771bcea60..7de275ee846 100644 --- a/modin/experimental/core/io/text/csv_glob_dispatcher.py +++ b/modin/experimental/core/io/text/csv_glob_dispatcher.py @@ -350,7 +350,14 @@ def get_path(cls, file_path: str) -> list: credential_error_type = (PermissionError,) def get_file_path(fs_handle) -> List[str]: - file_paths = fs_handle.glob(file_path) + if "*" in file_path: + file_paths = fs_handle.glob(file_path) + else: + file_paths = [ + f + for f in fs_handle.find(file_path) + if not f.endswith("/") # exclude folder + ] if len(file_paths) == 0 and not fs_handle.exists(file_path): raise FileNotFoundError(f"Path <{file_path}> isn't available.") fs_addresses = [fs_handle.unstrip_protocol(path) for path in file_paths] diff --git a/requirements-dev.txt b/requirements-dev.txt index 1ab23f7f2cf..a6b70f78190 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -16,7 +16,7 @@ distributed>=2.22.0 xarray Jinja2>=3.0.0 scipy>=1.7.1 -s3fs>=2021.8,<2023.9.0 +s3fs>=2021.8 lxml>=4.6.3 openpyxl>=3.0.7 xlrd>=2.0.1 diff --git a/requirements/env_hdk.yml b/requirements/env_hdk.yml index 781c3d5b1be..28bf4e6951b 100644 --- a/requirements/env_hdk.yml +++ b/requirements/env_hdk.yml @@ -13,7 +13,7 @@ dependencies: - psutil>=5.8.0 # optional dependencies - - s3fs>=2021.8,<2023.9.0 + - s3fs>=2021.8 - openpyxl>=3.0.10 - xlrd>=2.0.1 - sqlalchemy>=1.4.0,<1.4.46 diff --git a/requirements/env_unidist.yml b/requirements/env_unidist.yml index b28ea2764ff..eb5d098e06b 100644 --- a/requirements/env_unidist.yml +++ b/requirements/env_unidist.yml @@ -18,7 +18,7 @@ dependencies: # - xarray - Jinja2>=3.0.0 - scipy>=1.7.1 - - s3fs>=2021.8,<2023.9.0 + - s3fs>=2021.8 - lxml>=4.6.3 - openpyxl>=3.0.7 - xlrd>=2.0.1 diff --git a/requirements/requirements-no-engine.yml b/requirements/requirements-no-engine.yml index 39f783bb1ce..bb6e2597e58 100644 --- a/requirements/requirements-no-engine.yml +++ b/requirements/requirements-no-engine.yml @@ -16,7 +16,7 @@ dependencies: # - xarray - Jinja2>=3.0.0 - scipy>=1.7.1 - - s3fs>=2021.8,<2023.9.0 + - s3fs>=2021.8 - lxml>=4.6.3 - openpyxl>=3.0.7 - xlrd>=2.0.1 From 1e53b081d852f059a20fa9378154979745b215aa Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Wed, 13 Sep 2023 20:12:23 +0200 Subject: [PATCH 15/32] FIX-#6553: fix 'read_csv' with 'iterator=True' (#6554) Signed-off-by: Anatoly Myachev --- modin/core/io/text/text_file_dispatcher.py | 3 +++ modin/pandas/test/test_io.py | 10 ++++++++++ 2 files changed, 13 insertions(+) diff --git a/modin/core/io/text/text_file_dispatcher.py b/modin/core/io/text/text_file_dispatcher.py index 158212b37ec..75f05356bb1 100644 --- a/modin/core/io/text/text_file_dispatcher.py +++ b/modin/core/io/text/text_file_dispatcher.py @@ -683,6 +683,9 @@ def check_parameters_support( if read_kwargs["chunksize"] is not None: return (False, "`chunksize` parameter is not supported") + if read_kwargs.get("iterator"): + return (False, "`iterator==True` parameter is not supported") + if read_kwargs.get("dialect") is not None: return (False, "`dialect` parameter is not supported") diff --git a/modin/pandas/test/test_io.py b/modin/pandas/test/test_io.py index f6c76141815..b4babb0dddf 100644 --- a/modin/pandas/test/test_io.py +++ b/modin/pandas/test/test_io.py @@ -612,6 +612,16 @@ def test_read_csv_iteration(self, iterator): df_equals(modin_df, pd_df) + # Tests #6553 + if iterator: + rdf_reader = pd.read_csv(filename, iterator=iterator) + pd_reader = pandas.read_csv(filename, iterator=iterator) + + modin_df = rdf_reader.read() + pd_df = pd_reader.read() + + df_equals(modin_df, pd_df) + def test_read_csv_encoding_976(self): file_name = "modin/pandas/test/data/issue_976.csv" names = [str(i) for i in range(11)] From 92432fadfba0b73d6ffe9a67188b5e97f1c0b17c Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 15 Sep 2023 14:19:48 +0200 Subject: [PATCH 16/32] FIX-#5164: Fix unwrap_partitions for virtual partitions when `axis=None` (#6560) Co-authored-by: Rehan Durrani Signed-off-by: Anatoly Myachev --- .../dataframe/pandas/partitions.py | 25 +++++++++++++++++-- modin/test/test_partition_api.py | 22 ++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/modin/distributed/dataframe/pandas/partitions.py b/modin/distributed/dataframe/pandas/partitions.py index aa61b024b1d..1366826c6bf 100644 --- a/modin/distributed/dataframe/pandas/partitions.py +++ b/modin/distributed/dataframe/pandas/partitions.py @@ -23,18 +23,30 @@ if TYPE_CHECKING: from modin.core.execution.ray.implementations.pandas_on_ray.partitioning import ( PandasOnRayDataframePartition, + PandasOnRayDataframeColumnPartition, + PandasOnRayDataframeRowPartition, ) from modin.core.execution.dask.implementations.pandas_on_dask.partitioning import ( PandasOnDaskDataframePartition, + PandasOnDaskDataframeColumnPartition, + PandasOnDaskDataframeRowPartition, ) - from modin.core.execution.unidist.implementations.pandas_on_unidist.partitioning.partition import ( + from modin.core.execution.unidist.implementations.pandas_on_unidist.partitioning import ( PandasOnUnidistDataframePartition, + PandasOnUnidistDataframeColumnPartition, + PandasOnUnidistDataframeRowPartition, ) PartitionUnionType = Union[ PandasOnRayDataframePartition, PandasOnDaskDataframePartition, PandasOnUnidistDataframePartition, + PandasOnRayDataframeColumnPartition, + PandasOnRayDataframeRowPartition, + PandasOnDaskDataframeColumnPartition, + PandasOnDaskDataframeRowPartition, + PandasOnUnidistDataframeColumnPartition, + PandasOnUnidistDataframeRowPartition, ] else: from typing import Any @@ -85,7 +97,10 @@ def _unwrap_partitions() -> list: [p.drain_call_queue() for p in modin_frame._partitions.flatten()] def get_block(partition: PartitionUnionType) -> np.ndarray: - blocks = partition.list_of_blocks + if hasattr(partition, "force_materialization"): + blocks = partition.force_materialization().list_of_blocks + else: + blocks = partition.list_of_blocks assert ( len(blocks) == 1 ), f"Implementation assumes that partition contains a single block, but {len(blocks)} recieved." @@ -109,6 +124,12 @@ def get_block(partition: PartitionUnionType) -> np.ndarray: "PandasOnRayDataframePartition", "PandasOnDaskDataframePartition", "PandasOnUnidistDataframePartition", + "PandasOnRayDataframeColumnPartition", + "PandasOnRayDataframeRowPartition", + "PandasOnDaskDataframeColumnPartition", + "PandasOnDaskDataframeRowPartition", + "PandasOnUnidistDataframeColumnPartition", + "PandasOnUnidistDataframeRowPartition", ): return _unwrap_partitions() raise ValueError( diff --git a/modin/test/test_partition_api.py b/modin/test/test_partition_api.py index a90440aa8fe..64bd0ef8167 100644 --- a/modin/test/test_partition_api.py +++ b/modin/test/test_partition_api.py @@ -114,6 +114,28 @@ def get_df(lib, data): ) +def test_unwrap_virtual_partitions(): + # see #5164 for details + data = test_data["int_data"] + df = pd.DataFrame(data) + virtual_partitioned_df = pd.concat([df] * 10) + actual_partitions = np.array(unwrap_partitions(virtual_partitioned_df, axis=None)) + expected_df = pd.concat([pd.DataFrame(data)] * 10) + expected_partitions = expected_df._query_compiler._modin_frame._partitions + assert expected_partitions.shape == actual_partitions.shape + + for row_idx in range(expected_partitions.shape[0]): + for col_idx in range(expected_partitions.shape[1]): + df_equals( + get_func( + expected_partitions[row_idx][col_idx] + .force_materialization() + .list_of_blocks[0] + ), + get_func(actual_partitions[row_idx][col_idx]), + ) + + @pytest.mark.parametrize("column_widths", [None, "column_widths"]) @pytest.mark.parametrize("row_lengths", [None, "row_lengths"]) @pytest.mark.parametrize("columns", [None, "columns"]) From 60363d98535aab1d2f5168ac9812347afb1c7b4a Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Mon, 18 Sep 2023 21:47:58 +0200 Subject: [PATCH 17/32] FIX-#6572: Execute simple queries row-wise in pandas backend (#6575) Signed-off-by: Dmitry Chigarev --- .../storage_formats/base/query_compiler.py | 19 ++++++ .../storage_formats/pandas/query_compiler.py | 60 +++++++++++++++++++ modin/pandas/dataframe.py | 19 +++++- .../storage_formats/pandas/test_internals.py | 30 ++++++++++ 4 files changed, 125 insertions(+), 3 deletions(-) diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index 828dea2f9a1..b46611740f4 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -2270,6 +2270,25 @@ def nsmallest(self, n=5, columns=None, keep="first"): self, n=n, columns=columns, keep=keep ) + @doc_utils.add_refer_to("DataFrame.query") + def rowwise_query(self, expr, **kwargs): + """ + Query columns of the QueryCompiler with a boolean expression row-wise. + + Parameters + ---------- + expr : str + **kwargs : dict + + Returns + ------- + BaseQueryCompiler + New QueryCompiler containing the rows where the boolean expression is satisfied. + """ + raise NotImplementedError( + "Row-wise queries execution is not implemented for the selected backend." + ) + @doc_utils.add_refer_to("DataFrame.eval") def eval(self, expr, **kwargs): """ diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index cc9c90165d1..efb29680269 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -18,6 +18,7 @@ queries for the ``PandasDataframe``. """ +import ast import re import numpy as np import pandas @@ -3073,6 +3074,65 @@ def _list_like_func(self, func, axis, *args, **kwargs): ) return self.__constructor__(new_modin_frame) + def rowwise_query(self, expr, **kwargs): + """ + Query the columns of a ``PandasQueryCompiler`` with a boolean row-wise expression. + + Basically, in row-wise expressions we only allow column names, constants + and other variables captured using the '@' symbol. No function/method + cannot be called inside such expressions. + + Parameters + ---------- + expr : str + Row-wise boolean expression. + **kwargs : dict + Arguments to pass to the ``pandas.DataFrame.query()``. + + Returns + ------- + PandasQueryCompiler + + Raises + ------ + NotImplementedError + In case the passed expression cannot be executed row-wise. + """ + # Walk through the AST and verify it doesn't contain any nodes that + # prevent us from executing the query row-wise (we're basically + # looking for 'ast.Call') + nodes = ast.parse(expr.replace("@", "")).body + is_row_wise_query = True + + while nodes: + node = nodes.pop() + if isinstance(node, ast.Expr): + node = getattr(node, "value", node) + + if isinstance(node, ast.UnaryOp): + nodes.append(node.operand) + elif isinstance(node, ast.BinOp): + nodes.extend([node.left, node.right]) + elif isinstance(node, ast.BoolOp): + nodes.extend(node.values) + elif isinstance(node, ast.Compare): + nodes.extend([node.left] + node.comparators) + elif isinstance(node, (ast.Name, ast.Constant)): + pass + else: + # if we end up here then the expression is no longer simple + # enough to run it row-wise, so exiting + is_row_wise_query = False + break + + if not is_row_wise_query: + raise NotImplementedError("A non row-wise query was passed.") + + def query_builder(df, **modin_internal_kwargs): + return df.query(expr, inplace=False, **kwargs, **modin_internal_kwargs) + + return self.__constructor__(self._modin_frame.filter(1, query_builder)) + def _callable_func(self, func, axis, *args, **kwargs): """ Apply passed function to each row/column. diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index fb3cf666b88..923564a6c5b 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1562,10 +1562,23 @@ def query(self, expr, inplace=False, **kwargs): # noqa: PR01, RT01, D200 Query the columns of a ``DataFrame`` with a boolean expression. """ self._update_var_dicts_in_kwargs(expr, kwargs) + self._validate_eval_query(expr, **kwargs) inplace = validate_bool_kwarg(inplace, "inplace") - new_query_compiler = pandas.DataFrame.query( - self, expr, inplace=False, **kwargs - )._query_compiler + # HACK: this condition kind of breaks the idea of backend agnostic API as all queries + # _should_ work fine for all of the engines using `pandas.DataFrame.query(...)` approach. + # However, at this point we know that we can execute simple queries way more efficiently + # using the QC's API directly in case of pandas backend. Ideally, we have to make it work + # with the 'pandas.query' approach the same as good the direct QC call is. But investigating + # and fixing the root cause of the perf difference appears to be much more complicated + # than putting this hack here. Hopefully, we'll get rid of it soon: + # https://github.com/modin-project/modin/issues/6499 + try: + new_query_compiler = self._query_compiler.rowwise_query(expr, **kwargs) + except NotImplementedError: + # a non row-wise query was passed, falling back to pandas implementation + new_query_compiler = pandas.DataFrame.query( + self, expr, inplace=False, **kwargs + )._query_compiler return self._create_or_update_from_compiler(new_query_compiler, inplace) def rename( diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index d852a7b1db4..3a7a6992608 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -1079,3 +1079,33 @@ def test_reindex_preserve_dtypes(kwargs): reindexed_df = df.reindex(**kwargs) assert reindexed_df._query_compiler._modin_frame.has_materialized_dtypes + + +def test_query_dispatching(): + """ + Test whether the logic of determining whether the passed query + can be performed row-wise works correctly in ``PandasQueryCompiler.rowwise_query()``. + + The tested method raises a ``NotImpementedError`` if the query cannot be performed row-wise + and raises nothing if it can. + """ + qc = pd.DataFrame( + {"a": [1], "b": [2], "c": [3], "d": [4], "e": [5]} + )._query_compiler + + local_var = 10 # noqa: F841 (unused variable) + + # these queries should be performed row-wise (so no exception) + qc.rowwise_query("a < 1") + qc.rowwise_query("a < b") + qc.rowwise_query("a < (b + @local_var) * c > 10") + + # these queries cannot be performed row-wise (so they must raise an exception) + with pytest.raises(NotImplementedError): + qc.rowwise_query("a < b[0]") + with pytest.raises(NotImplementedError): + qc.rowwise_query("a < b.min()") + with pytest.raises(NotImplementedError): + qc.rowwise_query("a < (b + @local_var + (b - e.min())) * c > 10") + with pytest.raises(NotImplementedError): + qc.rowwise_query("a < b.size") From 4d10294f00787cd5b07c2748ef815febe6dd6f4c Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Tue, 26 Sep 2023 18:15:04 +0200 Subject: [PATCH 18/32] FIX-#6601: 'sort_values' shouldn't affect source dataframe/series (#6603) Signed-off-by: Anatoly Myachev --- modin/core/dataframe/pandas/dataframe/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 7754e5df1f2..5ed8506c3fe 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -2430,7 +2430,7 @@ def sort_function(df): # pragma: no cover # We perform the final steps of the sort on full axis partitions, so we know that the # length of each partition is the full length of the dataframe. if self.has_materialized_columns: - self._set_axis_lengths_cache([len(self.columns)], axis=axis.value ^ 1) + result._set_axis_lengths_cache([len(self.columns)], axis=axis.value ^ 1) if kwargs.get("ignore_index", False): result.index = RangeIndex(len(self.get_axis(axis.value))) From 1347797dbea7b352cdbae36e401a670611608ef8 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 27 Sep 2023 14:25:33 +0200 Subject: [PATCH 19/32] FIX-#6607: Fix incorrect cache after '.sort_values()' (#6608) Signed-off-by: Dmitry Chigarev --- .../dataframe/pandas/dataframe/dataframe.py | 34 ++++++--- .../storage_formats/pandas/test_internals.py | 70 ++++++++++++++++--- 2 files changed, 84 insertions(+), 20 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 5ed8506c3fe..d850cac69bd 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -2265,7 +2265,7 @@ def combine_and_apply( ) def _apply_func_to_range_partitioning( - self, key_column, func, ascending=True, **kwargs + self, key_column, func, ascending=True, preserve_columns=False, **kwargs ): """ Reshuffle data so it would be range partitioned and then apply the passed function row-wise. @@ -2278,6 +2278,8 @@ def _apply_func_to_range_partitioning( Function to apply against partitions. ascending : bool, default: True Whether the range should be built in ascending or descending order. + preserve_columns : bool, default: False + If the columns cache should be preserved (specify this flag if `func` doesn't change column labels). **kwargs : dict Additional arguments to forward to the range builder function. @@ -2288,7 +2290,14 @@ def _apply_func_to_range_partitioning( """ # If there's only one row partition can simply apply the function row-wise without the need to reshuffle if self._partitions.shape[0] == 1: - return self.apply_full_axis(axis=1, func=func) + result = self.apply_full_axis( + axis=1, + func=func, + new_columns=self.copy_columns_cache() if preserve_columns else None, + ) + if preserve_columns: + result._set_axis_lengths_cache(self._column_widths_cache, axis=1) + return result ideal_num_new_partitions = len(self._partitions) m = len(self.index) / ideal_num_new_partitions @@ -2365,7 +2374,14 @@ def _apply_func_to_range_partitioning( func, ) - return self.__constructor__(new_partitions) + result = self.__constructor__(new_partitions) + if preserve_columns: + result.set_columns_cache(self.copy_columns_cache()) + # We perform the final steps of the sort on full axis partitions, so we know that the + # length of each partition is the full length of the dataframe. + if self.has_materialized_columns: + result._set_axis_lengths_cache([len(self.columns)], axis=1) + return result @lazy_metadata_decorator(apply_axis="both") def sort_by( @@ -2422,15 +2438,13 @@ def sort_function(df): # pragma: no cover ) result = self._apply_func_to_range_partitioning( - key_column=columns[0], func=sort_function, ascending=ascending, **kwargs + key_column=columns[0], + func=sort_function, + ascending=ascending, + preserve_columns=True, + **kwargs, ) - - result.set_axis_cache(self.copy_axis_cache(axis.value ^ 1), axis=axis.value ^ 1) result.set_dtypes_cache(self.copy_dtypes_cache()) - # We perform the final steps of the sort on full axis partitions, so we know that the - # length of each partition is the full length of the dataframe. - if self.has_materialized_columns: - result._set_axis_lengths_cache([len(self.columns)], axis=axis.value ^ 1) if kwargs.get("ignore_index", False): result.index = RangeIndex(len(self.get_axis(axis.value))) diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index 3a7a6992608..571ce9fcf6c 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -117,20 +117,30 @@ def construct_modin_df_by_scheme(pandas_df, partitioning_scheme): return md_df -def validate_partitions_cache(df): - """Assert that the ``PandasDataframe`` shape caches correspond to the actual partition's shapes.""" - row_lengths = df._row_lengths_cache - column_widths = df._column_widths_cache +def validate_partitions_cache(df, axis=None): + """ + Assert that the ``PandasDataframe`` shape caches correspond to the actual partition's shapes. - assert row_lengths is not None - assert column_widths is not None - assert df._partitions.shape[0] == len(row_lengths) - assert df._partitions.shape[1] == len(column_widths) + Parameters + ---------- + df : PandasDataframe + axis : int, optional + An axis to verify the cache for. If not specified, verify cache for both of the axes. + """ + axis = [0, 1] if axis is None else [axis] + + axis_lengths = [df._row_lengths_cache, df._column_widths_cache] + + for ax in axis: + assert axis_lengths[ax] is not None + assert df._partitions.shape[ax] == len(axis_lengths[ax]) for i in range(df._partitions.shape[0]): for j in range(df._partitions.shape[1]): - assert df._partitions[i, j].length() == row_lengths[i] - assert df._partitions[i, j].width() == column_widths[j] + if 0 in axis: + assert df._partitions[i, j].length() == axis_lengths[0][i] + if 1 in axis: + assert df._partitions[i, j].width() == axis_lengths[1][j] def test_aligning_blocks(): @@ -1109,3 +1119,43 @@ def test_query_dispatching(): qc.rowwise_query("a < (b + @local_var + (b - e.min())) * c > 10") with pytest.raises(NotImplementedError): qc.rowwise_query("a < b.size") + + +def test_sort_values_cache(): + """ + Test that the column widths cache after ``.sort_values()`` is valid: + https://github.com/modin-project/modin/issues/6607 + """ + # 1 row partition and 2 column partitions, in this case '.sort_values()' will use + # row-wise implementation and so the column widths WILL NOT be changed + modin_df = construct_modin_df_by_scheme( + pandas.DataFrame({f"col{i}": range(100) for i in range(64)}), + partitioning_scheme={"row_lengths": [100], "column_widths": [32, 32]}, + ) + mf_initial = modin_df._query_compiler._modin_frame + + mf_res = modin_df.sort_values("col0")._query_compiler._modin_frame + # check that row-wise implementation was indeed used (col widths were not changed) + assert mf_res._column_widths_cache == [32, 32] + # check that the cache and actual col widths match + validate_partitions_cache(mf_res, axis=1) + # check that the initial frame's cache wasn't changed + assert mf_initial._column_widths_cache == [32, 32] + validate_partitions_cache(mf_initial, axis=1) + + # 2 row partition and 2 column partitions, in this case '.sort_values()' will use + # range-partitioning implementation and so the column widths WILL be changed + modin_df = construct_modin_df_by_scheme( + pandas.DataFrame({f"col{i}": range(100) for i in range(64)}), + partitioning_scheme={"row_lengths": [50, 50], "column_widths": [32, 32]}, + ) + mf_initial = modin_df._query_compiler._modin_frame + + mf_res = modin_df.sort_values("col0")._query_compiler._modin_frame + # check that range-partitioning implementation was indeed used (col widths were changed) + assert mf_res._column_widths_cache == [64] + # check that the cache and actual col widths match + validate_partitions_cache(mf_res, axis=1) + # check that the initial frame's cache wasn't changed + assert mf_initial._column_widths_cache == [32, 32] + validate_partitions_cache(mf_initial, axis=1) From aa918069b0ed1fae6a5e5927550341057ae25a31 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 29 Sep 2023 10:30:11 +0200 Subject: [PATCH 20/32] FIX-#6602: refactor `join` to avoid `distributing a dict object` warning (#6612) Signed-off-by: Anatoly Myachev --- modin/pandas/dataframe.py | 2 +- modin/pandas/test/dataframe/test_join_sort.py | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 923564a6c5b..6ae369079ef 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1123,7 +1123,7 @@ def join( if isinstance(other, Series): if other.name is None: raise ValueError("Other Series must have a name") - other = self.__constructor__({other.name: other}) + other = self.__constructor__(other) if on is not None: return self.__constructor__( query_compiler=self._query_compiler.join( diff --git a/modin/pandas/test/dataframe/test_join_sort.py b/modin/pandas/test/dataframe/test_join_sort.py index 970bf98717f..417d871582b 100644 --- a/modin/pandas/test/dataframe/test_join_sort.py +++ b/modin/pandas/test/dataframe/test_join_sort.py @@ -11,7 +11,10 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. +import warnings + import pytest +import matplotlib import numpy as np import pandas import matplotlib @@ -181,6 +184,26 @@ def test_join_5203(): dfs[0].join([dfs[1], dfs[2]], how="inner", on="a") +def test_join_6602(): + abbreviations = pd.Series( + ["Major League Baseball", "National Basketball Association"], + index=["MLB", "NBA"], + ) + teams = pd.DataFrame( + { + "name": ["Mariners", "Lakers"] * 50, + "league_abbreviation": ["MLB", "NBA"] * 50, + } + ) + + with warnings.catch_warnings(): + # check that join doesn't show UserWarning + warnings.filterwarnings( + "error", "Distributing object", category=UserWarning + ) + teams.set_index("league_abbreviation").join(abbreviations.rename("league_name")) + + @pytest.mark.parametrize( "test_data, test_data2", [ From dddd84796e2e7d8c22690d1107191cf8712e7fd1 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 29 Sep 2023 17:50:14 +0200 Subject: [PATCH 21/32] FIX-#6600: fix usage of list of UDF functions in 'Series.groupby.agg' (#6613) Signed-off-by: Anatoly Myachev --- modin/pandas/groupby.py | 9 ++++----- modin/pandas/test/test_groupby.py | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 6a09de684b9..56975172020 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -1670,10 +1670,9 @@ def _try_get_str_func(self, fn): Returns ------- str, list - If `fn` is a callable, return its name if it's a method of the groupby - object, otherwise return `fn` itself. If `fn` is a string, return it. - If `fn` is an Iterable, return a list of _try_get_str_func applied to - each element of `fn`. + If `fn` is a callable, return its name, otherwise return `fn` itself. + If `fn` is a string, return it. If `fn` is an Iterable, return a list + of _try_get_str_func applied to each element of `fn`. """ if not isinstance(fn, str) and isinstance(fn, Iterable): return [self._try_get_str_func(f) for f in fn] @@ -1683,7 +1682,7 @@ def _try_get_str_func(self, fn): elif fn is np.min: # np.min is called "amin", so it's not a method of the groupby object. return "amin" - return fn.__name__ if callable(fn) and fn.__name__ in dir(self) else fn + return fn.__name__ if callable(fn) else fn def value_counts( self, diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index af42a12bc64..397c47d6de0 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -1082,6 +1082,24 @@ def test_series_groupby(by, as_index_series_or_dataframe): eval_groups(modin_groupby, pandas_groupby) +def test_agg_udf_6600(): + data = { + "name": ["Mariners", "Lakers"] * 50, + "league_abbreviation": ["MLB", "NBA"] * 50, + } + modin_teams, pandas_teams = create_test_dfs(data) + + def my_first_item(s): + return s.iloc[0] + + for agg in (my_first_item, [my_first_item], ["nunique", my_first_item]): + eval_general( + modin_teams, + pandas_teams, + operation=lambda df: df.groupby("league_abbreviation").name.agg(agg), + ) + + def test_multi_column_groupby(): pandas_df = pandas.DataFrame( { From 9d188d43d8120d6b4d2a07bd534ca9ca6c7899ab Mon Sep 17 00:00:00 2001 From: Igor Zamyatin Date: Thu, 5 Oct 2023 05:39:44 -0500 Subject: [PATCH 22/32] FIX-#6628: Allow groupby diff for dates (#6631) Signed-off-by: izamyati --- modin/pandas/groupby.py | 11 +++++++++-- modin/pandas/test/dataframe/test_join_sort.py | 1 - modin/pandas/test/test_groupby.py | 15 +++++++++++++++ 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 56975172020..04ba95a7804 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -18,7 +18,12 @@ from pandas.core.apply import reconstruct_func from pandas.errors import SpecificationError import pandas.core.groupby -from pandas.core.dtypes.common import is_list_like, is_numeric_dtype, is_integer +from pandas.core.dtypes.common import ( + is_datetime64_any_dtype, + is_integer, + is_list_like, + is_numeric_dtype, +) from pandas._libs.lib import no_default import pandas.core.common as com from types import BuiltinFunctionType @@ -1267,7 +1272,9 @@ def diff(self, periods=1, axis=0): for col, dtype in self._df.dtypes.items(): # can't calculate diff on non-numeric columns, so check for non-numeric # columns that are not included in the `by` - if not is_numeric_dtype(dtype) and not ( + if not ( + is_numeric_dtype(dtype) or is_datetime64_any_dtype(dtype) + ) and not ( isinstance(self._by, BaseQueryCompiler) and col in self._by.columns ): raise TypeError(f"unsupported operand type for -: got {dtype}") diff --git a/modin/pandas/test/dataframe/test_join_sort.py b/modin/pandas/test/dataframe/test_join_sort.py index 417d871582b..348dccb7f43 100644 --- a/modin/pandas/test/dataframe/test_join_sort.py +++ b/modin/pandas/test/dataframe/test_join_sort.py @@ -14,7 +14,6 @@ import warnings import pytest -import matplotlib import numpy as np import pandas import matplotlib diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 397c47d6de0..4603561e418 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -2671,6 +2671,21 @@ def test_groupby_pct_change_diff_6194(): ) +def test_groupby_datetime_diff_6628(): + dates = pd.date_range(start="2023-01-01", periods=10, freq="W") + df = pd.DataFrame( + { + "date": dates, + "group": "A", + } + ) + eval_general( + df, + df._to_pandas(), + lambda df: df.groupby("group").diff(), + ) + + def eval_rolling(md_window, pd_window): eval_general(md_window, pd_window, lambda window: window.count()) eval_general(md_window, pd_window, lambda window: window.sum()) From 80df280432d12a3bffa33b65b3b12e7a8df43b42 Mon Sep 17 00:00:00 2001 From: Andrey Pavlenko Date: Mon, 9 Oct 2023 16:36:06 +0200 Subject: [PATCH 23/32] FIX-#6635: HDK: read_csv(): treat object dtype as string (#6636) Signed-off-by: Andrey Pavlenko --- .../native/implementations/hdk_on_native/io/io.py | 2 +- .../hdk_on_native/test/test_dataframe.py | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/io/io.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/io/io.py index dd102edbc30..d1c5c156896 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/io/io.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/io/io.py @@ -292,7 +292,7 @@ def _dtype_to_arrow(cls, dtype): tname = dtype if isinstance(dtype, str) else dtype.name if tname == "category": return pa.dictionary(index_type=pa.int32(), value_type=pa.string()) - elif tname == "string": + elif tname == "string" or tname == "object": return pa.string() else: return pa.from_numpy_dtype(tname) diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py index f85b0c99b9a..f2ed946dd76 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py @@ -396,6 +396,19 @@ def test(df, lib, **kwargs): run_and_compare(test, data={}) + def test_read_csv_dtype_object(self): + with pytest.warns(UserWarning) as warns: + with ensure_clean(".csv") as file: + with open(file, "w") as f: + f.write("test\ntest") + + def test(**kwargs): + return pd.read_csv(file, dtype={"test": "object"}) + + run_and_compare(test, data={}) + for warn in warns.list: + assert not re.match(r".*defaulting to pandas.*", str(warn)) + class TestMasks: data = { From 1a509165d7ac81463cbdab99cb21661693e08254 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Mon, 9 Oct 2023 17:23:54 +0200 Subject: [PATCH 24/32] FIX-#6637: Fix 'skiprows' parameter usage for 'read_excel' (#6638) Signed-off-by: Anatoly Myachev --- modin/core/io/text/excel_dispatcher.py | 7 +++++++ modin/core/storage_formats/pandas/parsers.py | 3 ++- modin/pandas/test/test_io.py | 11 +++++++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/modin/core/io/text/excel_dispatcher.py b/modin/core/io/text/excel_dispatcher.py index b9235abcae6..71a9745910f 100644 --- a/modin/core/io/text/excel_dispatcher.py +++ b/modin/core/io/text/excel_dispatcher.py @@ -59,6 +59,13 @@ def _read(cls, io, **kwargs): **kwargs ) + if kwargs.get("skiprows") is not None: + return cls.single_worker_read( + io, + reason="Modin doesn't support 'skiprows' parameter of `read_excel`", + **kwargs + ) + if isinstance(io, bytes): io = BytesIO(io) diff --git a/modin/core/storage_formats/pandas/parsers.py b/modin/core/storage_formats/pandas/parsers.py index 384f991af1f..ef298608d7e 100644 --- a/modin/core/storage_formats/pandas/parsers.py +++ b/modin/core/storage_formats/pandas/parsers.py @@ -580,7 +580,6 @@ def parse(fname, **kwargs): num_splits = kwargs.pop("num_splits", None) start = kwargs.pop("start", None) end = kwargs.pop("end", None) - _skiprows = kwargs.pop("skiprows") excel_header = kwargs.get("_header") sheet_name = kwargs.get("sheet_name", 0) footer = b"" @@ -589,6 +588,8 @@ def parse(fname, **kwargs): if start is None or end is None: return pandas.read_excel(fname, **kwargs) + _skiprows = kwargs.pop("skiprows") + from zipfile import ZipFile import openpyxl from openpyxl.worksheet._reader import WorksheetReader diff --git a/modin/pandas/test/test_io.py b/modin/pandas/test/test_io.py index b4babb0dddf..633afe2b637 100644 --- a/modin/pandas/test/test_io.py +++ b/modin/pandas/test/test_io.py @@ -1898,6 +1898,17 @@ def test_read_excel(self, pathlike, make_excel_file): io=Path(unique_filename) if pathlike else unique_filename, ) + @check_file_leaks + @pytest.mark.parametrize("skiprows", [2, [1, 3], lambda x: x in [0, 2]]) + def test_read_excel_skiprows(self, skiprows, make_excel_file): + eval_io( + fn_name="read_excel", + # read_excel kwargs + io=make_excel_file(), + skiprows=skiprows, + check_kwargs_callable=False, + ) + @check_file_leaks @pytest.mark.parametrize( "dtype_backend", [lib.no_default, "numpy_nullable", "pyarrow"] From f8657683266e30d982f1eabd3c43344d1d62bed8 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Wed, 11 Oct 2023 18:58:40 +0200 Subject: [PATCH 25/32] FIX-#6642: fix 'modin.numpy.array.sum' on HDK (#6643) Signed-off-by: Anatoly Myachev --- modin/experimental/core/storage_formats/hdk/query_compiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/experimental/core/storage_formats/hdk/query_compiler.py b/modin/experimental/core/storage_formats/hdk/query_compiler.py index 6fb82148d7c..3d97f2c0887 100644 --- a/modin/experimental/core/storage_formats/hdk/query_compiler.py +++ b/modin/experimental/core/storage_formats/hdk/query_compiler.py @@ -396,7 +396,7 @@ def min(self, **kwargs): return self._agg("min", **kwargs) def sum(self, **kwargs): - min_count = kwargs.pop("min_count") + min_count = kwargs.pop("min_count", 0) if min_count != 0: raise NotImplementedError( f"HDK's sum does not support such set of parameters: min_count={min_count}." From 27a4917562eeb74a7b80e5c811f85a5336ba2cfb Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 11 Oct 2023 23:34:45 +0300 Subject: [PATCH 26/32] FIX-#4507: Do not call 'ray.get()' inside of the kernel executing call queues (#6633) Signed-off-by: Dmitry Chigarev --- modin/core/execution/ray/common/utils.py | 108 ++++++++++++++++ .../pandas_on_ray/partitioning/partition.py | 39 ++++-- .../partitioning/virtual_partition.py | 63 +++++---- .../storage_formats/pandas/test_internals.py | 121 ++++++++++++++++++ 4 files changed, 293 insertions(+), 38 deletions(-) diff --git a/modin/core/execution/ray/common/utils.py b/modin/core/execution/ray/common/utils.py index 845159eb0ae..7c27221a76a 100644 --- a/modin/core/execution/ray/common/utils.py +++ b/modin/core/execution/ray/common/utils.py @@ -287,3 +287,111 @@ def deserialize(obj): # pragma: no cover return dict(zip(obj.keys(), RayWrapper.materialize(list(obj.values())))) else: return obj + + +def deconstruct_call_queue(call_queue): + """ + Deconstruct the passed call queue into a 1D list. + + This is required, so the call queue can be then passed to a Ray's kernel + as a variable-length argument ``kernel(*queue)`` so the Ray engine + automatically materialize all the futures that the queue might have contained. + + Parameters + ---------- + call_queue : list[list[func, args, kwargs], ...] + + Returns + ------- + num_funcs : int + The number of functions in the call queue. + arg_lengths : list of ints + The number of positional arguments for each function in the call queue. + kw_key_lengths : list of ints + The number of key-word arguments for each function in the call queue. + kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool} + Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]` + describes the j-th keyword argument of the i-th function in the call queue. + The describtion contains of the lengths of the argument and whether it's a list at all + (for example, {"len": 1, "was_iterable": False} describes a non-list argument). + unfolded_queue : list + A 1D call queue that can be reconstructed using ``reconstruct_call_queue`` function. + """ + num_funcs = len(call_queue) + arg_lengths = [] + kw_key_lengths = [] + kw_value_lengths = [] + unfolded_queue = [] + for call in call_queue: + unfolded_queue.append(call[0]) + unfolded_queue.extend(call[1]) + arg_lengths.append(len(call[1])) + # unfold keyword dict + ## unfold keys + unfolded_queue.extend(call[2].keys()) + kw_key_lengths.append(len(call[2])) + ## unfold values + value_lengths = [] + for value in call[2].values(): + was_iterable = True + if not isinstance(value, (list, tuple)): + was_iterable = False + value = (value,) + unfolded_queue.extend(value) + value_lengths.append({"len": len(value), "was_iterable": was_iterable}) + kw_value_lengths.append(value_lengths) + + return num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, *unfolded_queue + + +def reconstruct_call_queue( + num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, unfolded_queue +): + """ + Reconstruct original call queue from the result of the ``deconstruct_call_queue()``. + + Parameters + ---------- + num_funcs : int + The number of functions in the call queue. + arg_lengths : list of ints + The number of positional arguments for each function in the call queue. + kw_key_lengths : list of ints + The number of key-word arguments for each function in the call queue. + kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool} + Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]` + describes the j-th keyword argument of the i-th function in the call queue. + The describtion contains of the lengths of the argument and whether it's a list at all + (for example, {"len": 1, "was_iterable": False} describes a non-list argument). + unfolded_queue : list + A 1D call queue that is result of the ``deconstruct_call_queue()`` function. + + Returns + ------- + list[list[func, args, kwargs], ...] + Original call queue. + """ + items_took = 0 + + def take_n_items(n): + nonlocal items_took + res = unfolded_queue[items_took : items_took + n] + items_took += n + return res + + call_queue = [] + for i in range(num_funcs): + func = take_n_items(1)[0] + args = take_n_items(arg_lengths[i]) + kw_keys = take_n_items(kw_key_lengths[i]) + kwargs = {} + value_lengths = kw_value_lengths[i] + for key, value_length in zip(kw_keys, value_lengths): + vals = take_n_items(value_length["len"]) + if value_length["len"] == 1 and not value_length["was_iterable"]: + vals = vals[0] + kwargs[key] = vals + + call_queue.append((func, args, kwargs)) + + return call_queue diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index a7caabf2f9d..7326d755dbb 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -16,7 +16,11 @@ import ray from ray.util import get_node_ip_address -from modin.core.execution.ray.common.utils import deserialize, ObjectIDType +from modin.core.execution.ray.common.utils import ( + ObjectIDType, + deconstruct_call_queue, + reconstruct_call_queue, +) from modin.core.execution.ray.common import RayWrapper from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition from modin.pandas.indexing import compute_sliced_len @@ -97,7 +101,9 @@ def apply(self, func, *args, **kwargs): self._is_debug(log) and log.debug( f"SUBMIT::_apply_list_of_funcs::{self._identity}" ) - result, length, width, ip = _apply_list_of_funcs.remote(call_queue, data) + result, length, width, ip = _apply_list_of_funcs.remote( + data, *deconstruct_call_queue(call_queue) + ) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. @@ -128,7 +134,7 @@ def drain_call_queue(self): new_length, new_width, self._ip_cache, - ) = _apply_list_of_funcs.remote(call_queue, data) + ) = _apply_list_of_funcs.remote(data, *deconstruct_call_queue(call_queue)) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. @@ -391,16 +397,29 @@ def _apply_func(partition, func, *args, **kwargs): # pragma: no cover @ray.remote(num_returns=4) -def _apply_list_of_funcs(call_queue, partition): # pragma: no cover +def _apply_list_of_funcs( + partition, num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, *futures +): # pragma: no cover """ Execute all operations stored in the call queue on the partition in a worker process. Parameters ---------- - call_queue : list - A call queue that needs to be executed on the partition. partition : pandas.DataFrame A pandas DataFrame the call queue needs to be executed on. + num_funcs : int + The number of functions in the call queue. + arg_lengths : list of ints + The number of positional arguments for each function in the call queue. + kw_key_lengths : list of ints + The number of key-word arguments for each function in the call queue. + kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool} + Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]` + describes the j-th keyword argument of the i-th function in the call queue. + The describtion contains of the lengths of the argument and whether it's a list at all + (for example, {"len": 1, "was_iterable": False} describes a non-list argument). + *futures : list + A 1D call queue that is result of the ``deconstruct_call_queue()`` function. Returns ------- @@ -413,10 +432,10 @@ def _apply_list_of_funcs(call_queue, partition): # pragma: no cover str The node IP address of the worker process. """ - for func, f_args, f_kwargs in call_queue: - func = deserialize(func) - args = deserialize(f_args) - kwargs = deserialize(f_kwargs) + call_queue = reconstruct_call_queue( + num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, futures + ) + for func, args, kwargs in call_queue: try: partition = func(partition, *args, **kwargs) # Sometimes Arrow forces us to make a copy of an object before we operate on it. We diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index f81290faf24..816b1b717a7 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -20,8 +20,8 @@ from modin.core.dataframe.pandas.partitioning.axis_partition import ( PandasDataframeAxisPartition, ) -from modin.core.execution.ray.common.utils import deserialize from modin.core.execution.ray.common import RayWrapper + from .partition import PandasOnRayDataframePartition from modin.utils import _inherit_docstrings @@ -115,12 +115,13 @@ def deploy_splitting_func( else num_splits, ).remote( cls._get_deploy_split_func(), - axis, - func, - f_args, - f_kwargs, + *f_args, num_splits, *partitions, + axis=axis, + f_to_deploy=func, + f_len_args=len(f_args), + f_kwargs=f_kwargs, extract_metadata=extract_metadata, ) @@ -176,13 +177,14 @@ def deploy_axis_func( **({"max_retries": max_retries} if max_retries is not None else {}), ).remote( cls._get_deploy_axis_func(), - axis, - func, - f_args, - f_kwargs, + *f_args, num_splits, maintain_partitioning, *partitions, + axis=axis, + f_to_deploy=func, + f_len_args=len(f_args), + f_kwargs=f_kwargs, manual_partition=manual_partition, lengths=lengths, ) @@ -231,14 +233,15 @@ def deploy_func_between_two_axis_partitions( num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN) ).remote( PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions, - axis, - func, - f_args, - f_kwargs, + *f_args, num_splits, len_of_left, other_shape, *partitions, + axis=axis, + f_to_deploy=func, + f_len_args=len(f_args), + f_kwargs=f_kwargs, ) def wait(self): @@ -261,11 +264,11 @@ class PandasOnRayDataframeRowPartition(PandasOnRayDataframeVirtualPartition): @ray.remote def _deploy_ray_func( deployer, + *positional_args, axis, f_to_deploy, - f_args, + f_len_args, f_kwargs, - *args, extract_metadata=True, **kwargs, ): # pragma: no cover @@ -274,29 +277,32 @@ def _deploy_ray_func( This is ALWAYS called on either ``PandasDataframeAxisPartition.deploy_axis_func`` or ``PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions``, which both - serve to deploy another dataframe function on a Ray worker process. The provided ``f_args`` - is thus are deserialized here (on the Ray worker) before the function is called (``f_kwargs`` - will never contain more Ray objects, and thus does not require deserialization). + serve to deploy another dataframe function on a Ray worker process. The provided `positional_args` + contains positional arguments for both: `deployer` and for `f_to_deploy`, the parameters can be separated + using the `f_len_args` value. The parameters are combined so they will be deserialized by Ray before the + kernel is executed (`f_kwargs` will never contain more Ray objects, and thus does not require deserialization). Parameters ---------- deployer : callable A `PandasDataFrameAxisPartition.deploy_*` method that will call ``f_to_deploy``. + *positional_args : list + The first `f_len_args` elements in this list represent positional arguments + to pass to the `f_to_deploy`. The rest are positional arguments that will be + passed to `deployer`. axis : {0, 1} - The axis to perform the function along. + The axis to perform the function along. This argument is keyword only. f_to_deploy : callable or RayObjectID - The function to deploy. - f_args : list or tuple - Positional arguments to pass to ``f_to_deploy``. + The function to deploy. This argument is keyword only. + f_len_args : int + Number of positional arguments to pass to ``f_to_deploy``. This argument is keyword only. f_kwargs : dict - Keyword arguments to pass to ``f_to_deploy``. - *args : list - Positional arguments to pass to ``deployer``. + Keyword arguments to pass to ``f_to_deploy``. This argument is keyword only. extract_metadata : bool, default: True Whether to return metadata (length, width, ip) of the result. Passing `False` may relax the load on object storage as the remote function would return 4 times fewer futures. Passing `False` makes sense for temporary results where you know for sure that the - metadata will never be requested. + metadata will never be requested. This argument is keyword only. **kwargs : dict Keyword arguments to pass to ``deployer``. @@ -309,8 +315,9 @@ def _deploy_ray_func( ----- Ray functions are not detected by codecov (thus pragma: no cover). """ - f_args = deserialize(f_args) - result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs) + f_args = positional_args[:f_len_args] + deploy_args = positional_args[f_len_args:] + result = deployer(axis, f_to_deploy, f_args, f_kwargs, *deploy_args, **kwargs) if not extract_metadata: return result ip = get_node_ip_address() diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index 571ce9fcf6c..157e6ab819c 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -1159,3 +1159,124 @@ def test_sort_values_cache(): # check that the initial frame's cache wasn't changed assert mf_initial._column_widths_cache == [32, 32] validate_partitions_cache(mf_initial, axis=1) + + +class DummyFuture: + """ + A dummy object emulating future's behaviour, this class is used in ``test_call_queue_serialization``. + + It stores a random numeric value representing its data and `was_materialized` state. + Initially this object is considered to be serialized, the state can be changed by calling + the ``.materialize()`` method. + """ + + def __init__(self): + self._value = np.random.randint(0, 1_000_000) + self._was_materialized = False + + def materialize(self): + self._was_materialized = True + return self + + def __eq__(self, other): + if isinstance(other, type(self)) and self._value == other._value: + return True + return False + + +@pytest.mark.parametrize( + "call_queue", + [ + # empty call queue + [], + # a single-function call queue (the function has no argument and it's materialized) + [(0, [], {})], + # a single-function call queue (the function has no argument and it's serialized) + [(DummyFuture(), [], {})], + # a multiple-functions call queue, none of the functions have arguments + [(DummyFuture(), [], {}), (DummyFuture(), [], {}), (0, [], {})], + # a single-function call queue (the function has both positional and keyword arguments) + [ + ( + DummyFuture(), + [DummyFuture()], + { + "a": DummyFuture(), + "b": [DummyFuture()], + "c": [DummyFuture, DummyFuture()], + }, + ) + ], + # a multiple-functions call queue with mixed types of functions/arguments + [ + ( + DummyFuture(), + [1, DummyFuture(), DummyFuture(), [4, 5]], + {"a": [DummyFuture(), 2], "b": DummyFuture(), "c": [1]}, + ), + (0, [], {}), + (0, [1], {}), + (0, [DummyFuture(), DummyFuture()], {}), + ], + ], +) +def test_call_queue_serialization(call_queue): + """ + Test that the process of passing a call queue to Ray's kernel works correctly. + + Before passing a call queue to the kernel that actually executes it, the call queue + is unwrapped into a 1D list using the ``deconstruct_call_queue`` function. After that, + the 1D list is passed as a variable length argument to the kernel ``kernel(*queue)``, + this is done so the Ray engine automatically materialize all the futures that the queue + might have contained. In the end, inside of the kernel, the ``reconstruct_call_queue`` function + is called to rebuild the call queue into its original structure. + + This test emulates the described flow and verifies that it works properly. + """ + from modin.core.execution.ray.implementations.pandas_on_ray.partitioning.partition import ( + deconstruct_call_queue, + reconstruct_call_queue, + ) + + def materialize_queue(*values): + """ + Walk over the `values` and materialize all the future types. + + This function emulates how Ray remote functions materialize their positional arguments. + """ + return [ + val.materialize() if isinstance(val, DummyFuture) else val for val in values + ] + + def assert_everything_materialized(queue): + """Walk over the call queue and verify that all entities there are materialized.""" + + def assert_materialized(obj): + assert ( + isinstance(obj, DummyFuture) and obj._was_materialized + ) or not isinstance(obj, DummyFuture) + + for func, args, kwargs in queue: + assert_materialized(func) + for arg in args: + assert_materialized(arg) + for value in kwargs.values(): + if not isinstance(value, (list, tuple)): + value = [value] + for val in value: + assert_materialized(val) + + ( + num_funcs, + arg_lengths, + kw_key_lengths, + kw_value_lengths, + *queue, + ) = deconstruct_call_queue(call_queue) + queue = materialize_queue(*queue) + reconstructed_queue = reconstruct_call_queue( + num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, queue + ) + + assert call_queue == reconstructed_queue + assert_everything_materialized(reconstructed_queue) From 2b737203d0584b26d16347480d63991c2d45cacc Mon Sep 17 00:00:00 2001 From: Egor Date: Fri, 13 Oct 2023 17:30:56 +0200 Subject: [PATCH 27/32] FIX-#6647: Added init file to make modin/experimental/sql/hdk/query.py part of modin package (#6646) Signed-off-by: Egor Krivov --- modin/experimental/sql/hdk/__init__.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 modin/experimental/sql/hdk/__init__.py diff --git a/modin/experimental/sql/hdk/__init__.py b/modin/experimental/sql/hdk/__init__.py new file mode 100644 index 00000000000..31de5addb64 --- /dev/null +++ b/modin/experimental/sql/hdk/__init__.py @@ -0,0 +1,14 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +"""Implementation of HDK SQL functionality.""" From 712cfbee7d2acb3af6ab09e7c6c8bd8f5e4cfade Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Tue, 17 Oct 2023 16:37:08 +0200 Subject: [PATCH 28/32] FIX-#6651: make sure `Series.between` works correctly (#6656) Signed-off-by: Anatoly Myachev Co-authored-by: Dmitry Chigarev --- modin/pandas/series.py | 6 +++--- modin/pandas/test/test_series.py | 9 +++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/modin/pandas/series.py b/modin/pandas/series.py index 45b5316cead..88fb3454c76 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -686,9 +686,9 @@ def between(self, left, right, inclusive: str = "both"): # noqa: PR01, RT01, D2 """ Return boolean Series equivalent to left <= series <= right. """ - return self.__constructor__( - query_compiler=self._query_compiler.between(left, right, inclusive) - ) + # 'pandas.Series.between()' only uses public Series' API, + # so passing a Modin Series there is safe + return pandas.Series.between(self, left, right, inclusive) def combine(self, other, func, fill_value=None): # noqa: PR01, RT01, D200 """ diff --git a/modin/pandas/test/test_series.py b/modin/pandas/test/test_series.py index 0fe8ad944f6..16c105997c7 100644 --- a/modin/pandas/test/test_series.py +++ b/modin/pandas/test/test_series.py @@ -1232,13 +1232,14 @@ def test_array(data): eval_general(modin_series, pandas_series, lambda df: df.array) -@pytest.mark.xfail(reason="Using pandas Series.") @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) def test_between(data): - modin_series = create_test_series(data) + modin_series, pandas_series = create_test_series(data) - with pytest.raises(NotImplementedError): - modin_series.between(None, None) + df_equals( + modin_series.between(1, 4), + pandas_series.between(1, 4), + ) def test_between_time(): From ed7ade5a253e53d09f57c54f8701bcba70669ba0 Mon Sep 17 00:00:00 2001 From: Igor Zamyatin Date: Sat, 21 Oct 2023 16:09:02 -0500 Subject: [PATCH 29/32] FIX-#6632: Return Series instead of Dataframe for groupby.apply in case of experimental groupby (#6649) Signed-off-by: izamyati Co-authored-by: Dmitry Chigarev --- modin/pandas/groupby.py | 22 +++++++++++++--------- modin/pandas/test/test_groupby.py | 17 +++++++++++++++++ 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 04ba95a7804..28c429cabc2 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -565,16 +565,20 @@ def apply(self, func, *args, **kwargs): if not isinstance(func, BuiltinFunctionType): func = wrap_udf_function(func) - return self._check_index( - self._wrap_aggregation( - qc_method=type(self._query_compiler).groupby_agg, - numeric_only=False, - agg_func=func, - agg_args=args, - agg_kwargs=kwargs, - how="group_wise", - ) + apply_res = self._wrap_aggregation( + qc_method=type(self._query_compiler).groupby_agg, + numeric_only=False, + agg_func=func, + agg_args=args, + agg_kwargs=kwargs, + how="group_wise", ) + reduced_index = pandas.Index([MODIN_UNNAMED_SERIES_LABEL]) + if not isinstance(apply_res, Series) and apply_res.columns.equals( + reduced_index + ): + apply_res = apply_res.squeeze(axis=1) + return self._check_index(apply_res) @property def dtypes(self): diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 4603561e418..ca40bec96a0 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -2901,3 +2901,20 @@ def test_reshuffling_groupby_on_strings(modify_config): eval_general( modin_df.groupby("col1"), pandas_df.groupby("col1"), lambda grp: grp.mean() ) + + +@pytest.mark.parametrize( + "modify_config", [{ExperimentalGroupbyImpl: True}], indirect=True +) +def test_groupby_apply_series_result(modify_config): + # reproducer from the issue: + # https://github.com/modin-project/modin/issues/6632 + df = pd.DataFrame( + np.random.randint(5, 10, size=5), index=[f"s{i+1}" for i in range(5)] + ) + df["group"] = [1, 1, 2, 2, 3] + + # res = df.groupby('group').apply(lambda x: x.name+2) + eval_general( + df, df._to_pandas(), lambda df: df.groupby("group").apply(lambda x: x.name + 2) + ) From 34a37545e3cf325917e7ebc3cd85280ea2b14626 Mon Sep 17 00:00:00 2001 From: Igor Zamyatin Date: Wed, 25 Oct 2023 15:48:03 -0500 Subject: [PATCH 30/32] FIX-#6680: Specify navigation_with_keys=True to fix docs build (#6681) Signed-off-by: izamyati --- docs/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/conf.py b/docs/conf.py index 39aad78039b..d140161419d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -176,6 +176,7 @@ def noop_decorator(*args, **kwargs): "icon": "fas fa-envelope-square", }, ], + "navigation_with_keys": True, } # Custom sidebar templates, must be a dictionary that maps document names From 271ba82c064a6f233007c28d0811cffc0124ff80 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Tue, 14 Nov 2023 20:51:59 +0100 Subject: [PATCH 31/32] FIX-#6594: fix usage of Modin objects inside UDFs for `apply` (#6673) Signed-off-by: Anatoly Myachev --- .../dataframe/pandas/dataframe/dataframe.py | 10 +++++++ .../pandas/partitioning/partition_manager.py | 23 ++++++++++++-- .../pandas_on_dask/dataframe/dataframe.py | 23 ++++++++++++++ .../pandas_on_unidist/dataframe/dataframe.py | 4 +++ .../storage_formats/base/query_compiler.py | 10 +++++++ .../storage_formats/hdk/query_compiler.py | 3 ++ modin/pandas/dataframe.py | 30 +++++++++++++++---- modin/pandas/series.py | 30 +++++++++++++++---- modin/pandas/test/dataframe/test_pickle.py | 30 +++++++++++++++++-- modin/pandas/test/test_series.py | 27 +++++++++++++++++ 10 files changed, 176 insertions(+), 14 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index d850cac69bd..4ac54be9e70 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3922,6 +3922,16 @@ def finalize(self): """ self._partition_mgr_cls.finalize(self._partitions) + def support_materialization_in_worker_process(self) -> bool: + """ + Whether it's possible to call function `to_pandas` during the pickling process, at the moment of recreating the object. + + Returns + ------- + bool + """ + return True + def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True): """ Get a Modin DataFrame that implements the dataframe exchange protocol. diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index b7a4652b558..7c03e0a24f0 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -27,7 +27,13 @@ from modin.error_message import ErrorMessage from modin.core.storage_formats.pandas.utils import compute_chunksize from modin.core.dataframe.pandas.utils import concatenate -from modin.config import NPartitions, ProgressBar, BenchmarkMode +from modin.config import ( + NPartitions, + ProgressBar, + BenchmarkMode, + PersistentPickle, + Engine, +) from modin.logging import ClassLogger import os @@ -117,7 +123,20 @@ def preprocess_func(cls, map_func): `map_func` if the `apply` method of the `PandasDataframePartition` object you are using does not require any modification to a given function. """ - return cls._partition_class.preprocess_func(map_func) + old_value = PersistentPickle.get() + # When performing a function with Modin objects, it is more profitable to + # do the conversion to pandas once on the main process than several times + # on worker processes. Details: https://github.com/modin-project/modin/pull/6673/files#r1391086755 + # For Dask, otherwise there may be an error: `coroutine 'Client._gather' was never awaited` + need_update = not PersistentPickle.get() and Engine.get() != "Dask" + if need_update: + PersistentPickle.put(True) + try: + result = cls._partition_class.preprocess_func(map_func) + finally: + if need_update: + PersistentPickle.put(old_value) + return result # END Abstract Methods diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py index 78c40853ec1..755e5e6f170 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py @@ -40,3 +40,26 @@ class PandasOnDaskDataframe(PandasDataframe): """ _partition_mgr_cls = PandasOnDaskDataframePartitionManager + + @classmethod + def reconnect(cls, address, attributes): # noqa: GL08 + # The main goal is to configure the client for the worker process + # using the address passed by the custom `__reduce__` function + try: + from distributed import default_client + + default_client() + except ValueError: + from distributed import Client + + # setup `default_client` for worker process + _ = Client(address) + obj = cls.__new__(cls) + obj.__dict__.update(attributes) + return obj + + def __reduce__(self): # noqa: GL08 + from distributed import default_client + + address = default_client().scheduler_info()["address"] + return self.reconnect, (address, self.__dict__) diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py index 66323820506..bc799aa2ce2 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py @@ -40,3 +40,7 @@ class PandasOnUnidistDataframe(PandasDataframe): """ _partition_mgr_cls = PandasOnUnidistDataframePartitionManager + + def support_materialization_in_worker_process(self) -> bool: + # more details why this is not `True` in https://github.com/modin-project/modin/pull/6673 + return False diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index b46611740f4..0f1ecc66b8b 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -313,6 +313,16 @@ def finalize(self): """Finalize constructing the dataframe calling all deferred functions which were used to build it.""" pass + def support_materialization_in_worker_process(self) -> bool: + """ + Whether it's possible to call function `to_pandas` during the pickling process, at the moment of recreating the object. + + Returns + ------- + bool + """ + return self._modin_frame.support_materialization_in_worker_process() + # END Data Management Methods # To/From Pandas diff --git a/modin/experimental/core/storage_formats/hdk/query_compiler.py b/modin/experimental/core/storage_formats/hdk/query_compiler.py index 3d97f2c0887..7e2c449e93c 100644 --- a/modin/experimental/core/storage_formats/hdk/query_compiler.py +++ b/modin/experimental/core/storage_formats/hdk/query_compiler.py @@ -181,6 +181,9 @@ def finalize(self): # TODO: implement this for HDK storage format raise NotImplementedError() + def support_materialization_in_worker_process(self) -> bool: + return True + def to_pandas(self): return self._modin_frame.to_pandas() diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 6ae369079ef..f62c2ae03ff 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -14,6 +14,7 @@ """Module houses ``DataFrame`` class, that is distributed version of ``pandas.DataFrame``.""" from __future__ import annotations + import pandas from pandas.core.common import apply_if_callable, get_cython_func from pandas.core.computation.eval import _check_engine @@ -35,6 +36,7 @@ ) import datetime +import os import re import itertools import functools @@ -2978,7 +2980,7 @@ def _getitem(self, key): # Persistance support methods - BEGIN @classmethod - def _inflate_light(cls, query_compiler): + def _inflate_light(cls, query_compiler, source_pid): """ Re-creates the object from previously-serialized lightweight representation. @@ -2988,16 +2990,23 @@ def _inflate_light(cls, query_compiler): ---------- query_compiler : BaseQueryCompiler Query compiler to use for object re-creation. + source_pid : int + Determines whether a Modin or pandas object needs to be created. + Modin objects are created only on the main process. Returns ------- DataFrame New ``DataFrame`` based on the `query_compiler`. """ + if os.getpid() != source_pid: + return query_compiler.to_pandas() + # The current logic does not involve creating Modin objects + # and manipulation with them in worker processes return cls(query_compiler=query_compiler) @classmethod - def _inflate_full(cls, pandas_df): + def _inflate_full(cls, pandas_df, source_pid): """ Re-creates the object from previously-serialized disk-storable representation. @@ -3005,18 +3014,29 @@ def _inflate_full(cls, pandas_df): ---------- pandas_df : pandas.DataFrame Data to use for object re-creation. + source_pid : int + Determines whether a Modin or pandas object needs to be created. + Modin objects are created only on the main process. Returns ------- DataFrame New ``DataFrame`` based on the `pandas_df`. """ + if os.getpid() != source_pid: + return pandas_df + # The current logic does not involve creating Modin objects + # and manipulation with them in worker processes return cls(data=from_pandas(pandas_df)) def __reduce__(self): self._query_compiler.finalize() - if PersistentPickle.get(): - return self._inflate_full, (self._to_pandas(),) - return self._inflate_light, (self._query_compiler,) + pid = os.getpid() + if ( + PersistentPickle.get() + or not self._query_compiler.support_materialization_in_worker_process() + ): + return self._inflate_full, (self._to_pandas(), pid) + return self._inflate_light, (self._query_compiler, pid) # Persistance support methods - END diff --git a/modin/pandas/series.py b/modin/pandas/series.py index 88fb3454c76..aba2781d47a 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -14,6 +14,7 @@ """Module houses `Series` class, that is distributed version of `pandas.Series`.""" from __future__ import annotations + import numpy as np import pandas from pandas.io.formats.info import SeriesInfo @@ -29,6 +30,7 @@ from pandas._libs.lib import no_default, NoDefault from pandas._typing import IndexKeyFunc, Axis from typing import Union, Optional, Hashable, TYPE_CHECKING, IO +import os import warnings from modin.logging import disable_logging @@ -2401,7 +2403,7 @@ def _repartition(self): # Persistance support methods - BEGIN @classmethod - def _inflate_light(cls, query_compiler, name): + def _inflate_light(cls, query_compiler, name, source_pid): """ Re-creates the object from previously-serialized lightweight representation. @@ -2413,16 +2415,23 @@ def _inflate_light(cls, query_compiler, name): Query compiler to use for object re-creation. name : str The name to give to the new object. + source_pid : int + Determines whether a Modin or pandas object needs to be created. + Modin objects are created only on the main process. Returns ------- Series New Series based on the `query_compiler`. """ + if os.getpid() != source_pid: + return query_compiler.to_pandas() + # The current logic does not involve creating Modin objects + # and manipulation with them in worker processes return cls(query_compiler=query_compiler, name=name) @classmethod - def _inflate_full(cls, pandas_series): + def _inflate_full(cls, pandas_series, source_pid): """ Re-creates the object from previously-serialized disk-storable representation. @@ -2430,18 +2439,29 @@ def _inflate_full(cls, pandas_series): ---------- pandas_series : pandas.Series Data to use for object re-creation. + source_pid : int + Determines whether a Modin or pandas object needs to be created. + Modin objects are created only on the main process. Returns ------- Series New Series based on the `pandas_series`. """ + if os.getpid() != source_pid: + return pandas_series + # The current logic does not involve creating Modin objects + # and manipulation with them in worker processes return cls(data=pandas_series) def __reduce__(self): self._query_compiler.finalize() - if PersistentPickle.get(): - return self._inflate_full, (self._to_pandas(),) - return self._inflate_light, (self._query_compiler, self.name) + pid = os.getpid() + if ( + PersistentPickle.get() + or not self._query_compiler.support_materialization_in_worker_process() + ): + return self._inflate_full, (self._to_pandas(), pid) + return self._inflate_light, (self._query_compiler, self.name, pid) # Persistance support methods - END diff --git a/modin/pandas/test/dataframe/test_pickle.py b/modin/pandas/test/dataframe/test_pickle.py index 755c1775a8f..c58306784d9 100644 --- a/modin/pandas/test/dataframe/test_pickle.py +++ b/modin/pandas/test/dataframe/test_pickle.py @@ -17,8 +17,7 @@ import modin.pandas as pd from modin.config import PersistentPickle - -from modin.pandas.test.utils import df_equals +from modin.pandas.test.utils import create_test_dfs, df_equals @pytest.fixture @@ -47,6 +46,33 @@ def test_dataframe_pickle(modin_df, persistent): df_equals(modin_df, other) +def test__reduce__(): + # `DataFrame.__reduce__` will be called implicitly when lambda expressions are + # pre-processed for the distributed engine. + dataframe_data = ["Major League Baseball", "National Basketball Association"] + abbr_md, abbr_pd = create_test_dfs(dataframe_data, index=["MLB", "NBA"]) + # breakpoint() + + dataframe_data = { + "name": ["Mariners", "Lakers"] * 500, + "league_abbreviation": ["MLB", "NBA"] * 500, + } + teams_md, teams_pd = create_test_dfs(dataframe_data) + + result_md = ( + teams_md.set_index("name") + .league_abbreviation.apply(lambda abbr: abbr_md[0].loc[abbr]) + .rename("league") + ) + + result_pd = ( + teams_pd.set_index("name") + .league_abbreviation.apply(lambda abbr: abbr_pd[0].loc[abbr]) + .rename("league") + ) + df_equals(result_md, result_pd) + + def test_column_pickle(modin_column, modin_df, persistent): dmp = pickle.dumps(modin_column) other = pickle.loads(dmp) diff --git a/modin/pandas/test/test_series.py b/modin/pandas/test/test_series.py index 16c105997c7..d3c3ce5a06c 100644 --- a/modin/pandas/test/test_series.py +++ b/modin/pandas/test/test_series.py @@ -64,6 +64,7 @@ axis_values, bool_arg_keys, bool_arg_values, + create_test_dfs, int_arg_keys, int_arg_values, encoding_types, @@ -4834,3 +4835,29 @@ def test_binary_numpy_universal_function_issue_6483(): *create_test_series(test_data["float_nan_data"]), lambda series: np.arctan2(series, np.sin(series)), ) + + +def test__reduce__(): + # `Series.__reduce__` will be called implicitly when lambda expressions are + # pre-processed for the distributed engine. + series_data = ["Major League Baseball", "National Basketball Association"] + abbr_md, abbr_pd = create_test_series(series_data, index=["MLB", "NBA"]) + + dataframe_data = { + "name": ["Mariners", "Lakers"] * 500, + "league_abbreviation": ["MLB", "NBA"] * 500, + } + teams_md, teams_pd = create_test_dfs(dataframe_data) + + result_md = ( + teams_md.set_index("name") + .league_abbreviation.apply(lambda abbr: abbr_md.loc[abbr]) + .rename("league") + ) + + result_pd = ( + teams_pd.set_index("name") + .league_abbreviation.apply(lambda abbr: abbr_pd.loc[abbr]) + .rename("league") + ) + df_equals(result_md, result_pd) From c7607cef395d66d763a8a3ccd8dfeed1bf3a862c Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Wed, 15 Nov 2023 14:20:04 +0100 Subject: [PATCH 32/32] pin unidist<=0.4.1 Signed-off-by: Anatoly Myachev --- requirements/env_unidist.yml | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements/env_unidist.yml b/requirements/env_unidist.yml index eb5d098e06b..5a9351327cd 100644 --- a/requirements/env_unidist.yml +++ b/requirements/env_unidist.yml @@ -7,7 +7,7 @@ dependencies: # required dependencies - pandas>=2,<2.1 - numpy>=1.20.3 - - unidist-mpi>=0.2.1 + - unidist-mpi>=0.2.1,<=0.4.1 - fsspec>=2021.07.0 - packaging>=21.0 - psutil>=5.8.0 diff --git a/setup.py b/setup.py index 596e2f5fe44..a37c747223c 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ # ray==2.5.0 broken: https://github.com/conda-forge/ray-packages-feedstock/issues/100 # pydantic<2: https://github.com/modin-project/modin/issues/6336 ray_deps = ["ray[default]>=1.13.0,!=2.5.0", "pyarrow>=7.0.0", "pydantic<2"] -unidist_deps = ["unidist[mpi]>=0.2.1"] +unidist_deps = ["unidist[mpi]>=0.2.1,<=0.4.1"] spreadsheet_deps = ["modin-spreadsheet>=0.1.0"] sql_deps = ["dfsql>=0.4.2", "pyparsing<=2.4.7"] all_deps = dask_deps + ray_deps + unidist_deps + spreadsheet_deps