diff --git a/docs/conf.py b/docs/conf.py index 39aad78039b..d140161419d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -176,6 +176,7 @@ def noop_decorator(*args, **kwargs): "icon": "fas fa-envelope-square", }, ], + "navigation_with_keys": True, } # Custom sidebar templates, must be a dictionary that maps document names diff --git a/modin/conftest.py b/modin/conftest.py index 28c91547dc0..8bd218c4df6 100644 --- a/modin/conftest.py +++ b/modin/conftest.py @@ -712,3 +712,28 @@ def s3_resource(s3_base): if not cli.list_buckets()["Buckets"]: break time.sleep(0.1) + + +@pytest.fixture +def modify_config(request): + values = request.param + old_values = {} + + for key, value in values.items(): + old_values[key] = key.get() + key.put(value) + + yield # waiting for the test to be completed + # restoring old parameters + for key, value in old_values.items(): + try: + key.put(value) + except ValueError as e: + # sometimes bool env variables have 'None' as a default value, which + # causes a ValueError when we try to set this value back, as technically, + # only bool values are allowed (and 'None' is not a bool), in this case + # we try to set 'False' instead + if key.type == bool and value is None: + key.put(False) + else: + raise e diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index fda5ad7b4a1..4ac54be9e70 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -31,7 +31,7 @@ from pandas._libs.lib import no_default from typing import List, Hashable, Optional, Callable, Union, Dict, TYPE_CHECKING -from modin.config import Engine +from modin.config import Engine, IsRayCluster from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler from modin.core.storage_formats.pandas.utils import get_length_list from modin.error_message import ErrorMessage @@ -2265,7 +2265,7 @@ def combine_and_apply( ) def _apply_func_to_range_partitioning( - self, key_column, func, ascending=True, **kwargs + self, key_column, func, ascending=True, preserve_columns=False, **kwargs ): """ Reshuffle data so it would be range partitioned and then apply the passed function row-wise. @@ -2278,6 +2278,8 @@ def _apply_func_to_range_partitioning( Function to apply against partitions. ascending : bool, default: True Whether the range should be built in ascending or descending order. + preserve_columns : bool, default: False + If the columns cache should be preserved (specify this flag if `func` doesn't change column labels). **kwargs : dict Additional arguments to forward to the range builder function. @@ -2288,7 +2290,14 @@ def _apply_func_to_range_partitioning( """ # If there's only one row partition can simply apply the function row-wise without the need to reshuffle if self._partitions.shape[0] == 1: - return self.apply_full_axis(axis=1, func=func) + result = self.apply_full_axis( + axis=1, + func=func, + new_columns=self.copy_columns_cache() if preserve_columns else None, + ) + if preserve_columns: + result._set_axis_lengths_cache(self._column_widths_cache, axis=1) + return result ideal_num_new_partitions = len(self._partitions) m = len(self.index) / ideal_num_new_partitions @@ -2311,12 +2320,12 @@ def _apply_func_to_range_partitioning( # simply combine all partitions and apply the sorting to the whole dataframe return self.combine_and_apply(func=func) - if self.dtypes[key_column] == object: + if is_numeric_dtype(self.dtypes[key_column]): + method = "linear" + else: # This means we are not sorting numbers, so we need our quantiles to not try # arithmetic on the values. method = "inverted_cdf" - else: - method = "linear" shuffling_functions = build_sort_functions( self, @@ -2365,7 +2374,14 @@ def _apply_func_to_range_partitioning( func, ) - return self.__constructor__(new_partitions) + result = self.__constructor__(new_partitions) + if preserve_columns: + result.set_columns_cache(self.copy_columns_cache()) + # We perform the final steps of the sort on full axis partitions, so we know that the + # length of each partition is the full length of the dataframe. + if self.has_materialized_columns: + result._set_axis_lengths_cache([len(self.columns)], axis=1) + return result @lazy_metadata_decorator(apply_axis="both") def sort_by( @@ -2422,15 +2438,13 @@ def sort_function(df): # pragma: no cover ) result = self._apply_func_to_range_partitioning( - key_column=columns[0], func=sort_function, ascending=ascending, **kwargs + key_column=columns[0], + func=sort_function, + ascending=ascending, + preserve_columns=True, + **kwargs, ) - - result.set_axis_cache(self.copy_axis_cache(axis.value ^ 1), axis=axis.value ^ 1) result.set_dtypes_cache(self.copy_dtypes_cache()) - # We perform the final steps of the sort on full axis partitions, so we know that the - # length of each partition is the full length of the dataframe. - if self.has_materialized_columns: - self._set_axis_lengths_cache([len(self.columns)], axis=axis.value ^ 1) if kwargs.get("ignore_index", False): result.index = RangeIndex(len(self.get_axis(axis.value))) @@ -3495,6 +3509,7 @@ def groupby( by: Union[str, List[str]], operator: Callable, result_schema: Optional[Dict[Hashable, type]] = None, + align_result_columns=False, **kwargs: dict, ) -> "PandasDataframe": """ @@ -3512,6 +3527,10 @@ def groupby( on the output desired by the user. result_schema : dict, optional Mapping from column labels to data types that represents the types of the output dataframe. + align_result_columns : bool, default: False + Whether to manually align columns between all the resulted row partitions. + This flag is helpful when dealing with UDFs as they can change the partition's shape + and labeling unpredictably, resulting in an invalid dataframe. **kwargs : dict Additional arguments to pass to the ``df.groupby`` method (besides the 'by' argument). @@ -3541,19 +3560,118 @@ def groupby( if not isinstance(by, list): by = [by] + skip_on_aligning_flag = "__skip_me_on_aligning__" + def apply_func(df): # pragma: no cover if any(is_categorical_dtype(dtype) for dtype in df.dtypes[by].values): raise NotImplementedError( "Reshuffling groupby is not yet supported when grouping on a categorical column. " + "https://github.com/modin-project/modin/issues/5925" ) - return operator(df.groupby(by, **kwargs)) + result = operator(df.groupby(by, **kwargs)) + if ( + align_result_columns + and df.empty + and result.empty + and df.columns.equals(result.columns) + ): + # We want to align columns only of those frames that actually performed + # some groupby aggregation, if an empty frame was originally passed + # (an empty bin on reshuffling was created) then there were no groupby + # executed over this partition and so it has incorrect columns + # that shouldn't be considered on the aligning phase + result.attrs[skip_on_aligning_flag] = True + return result result = self._apply_func_to_range_partitioning( key_column=by[0], func=apply_func, ) + # no need aligning columns if there's only one row partition + if align_result_columns and result._partitions.shape[0] > 1: + # FIXME: the current reshuffling implementation guarantees us that there's only one column + # partition in the result, so we should never hit this exception for now, however + # in the future, we might want to make this implementation more broader + if result._partitions.shape[1] > 1: + raise NotImplementedError( + "Aligning columns is not yet implemented for multiple column partitions." + ) + + # There're two implementations: + # 1. The first one work faster, but may stress the network a lot in cluster mode since + # it gathers all the dataframes in a single ray-kernel. + # 2. The second one works slower, but only gathers light pandas.Index objects, + # so there should be less stress on the network. + if not IsRayCluster.get(): + + def compute_aligned_columns(*dfs): + """Take row partitions, filter empty ones, and return joined columns for them.""" + valid_dfs = [ + df + for df in dfs + if not df.attrs.get(skip_on_aligning_flag, False) + ] + if len(valid_dfs) == 0 and len(dfs) != 0: + valid_dfs = dfs + + # Using '.concat()' on empty-slices instead of 'Index.join()' + # in order to get identical behavior to pandas when it joins + # results of different groups + return pandas.concat( + [df.iloc[:0] for df in valid_dfs], axis=0, join="outer" + ).columns + + # Passing all partitions to the 'compute_aligned_columns' kernel to get + # aligned columns + parts = result._partitions.flatten() + aligned_columns = parts[0].apply( + compute_aligned_columns, *[part._data for part in parts[1:]] + ) + + # Lazily applying aligned columns to partitions + new_partitions = self._partition_mgr_cls.lazy_map_partitions( + result._partitions, + lambda df, columns: df.reindex(columns=columns), + func_args=(aligned_columns._data,), + ) + else: + + def join_cols(df, *cols): + """Join `cols` and apply the joined columns to `df`.""" + valid_cols = [ + pandas.DataFrame(columns=col) for col in cols if col is not None + ] + if len(valid_cols) == 0: + return df + # Using '.concat()' on empty-slices instead of 'Index.join()' + # in order to get identical behavior to pandas when it joins + # results of different groups + result_col = pandas.concat(valid_cols, axis=0, join="outer").columns + return df.reindex(columns=result_col) + + # Getting futures for columns of non-empty partitions + cols = [ + part.apply( + lambda df: None + if df.attrs.get(skip_on_aligning_flag, False) + else df.columns + )._data + for part in result._partitions.flatten() + ] + + # Lazily joining and applying the aligned columns + new_partitions = self._partition_mgr_cls.lazy_map_partitions( + result._partitions, + join_cols, + func_args=cols, + ) + result = self.__constructor__( + new_partitions, + index=result.copy_index_cache(), + row_lengths=result._row_lengths_cache, + ) + if result_schema is not None: new_dtypes = pandas.Series(result_schema) @@ -3804,6 +3922,16 @@ def finalize(self): """ self._partition_mgr_cls.finalize(self._partitions) + def support_materialization_in_worker_process(self) -> bool: + """ + Whether it's possible to call function `to_pandas` during the pickling process, at the moment of recreating the object. + + Returns + ------- + bool + """ + return True + def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True): """ Get a Modin DataFrame that implements the dataframe exchange protocol. diff --git a/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py b/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py index 28aff46a7b7..08185f38485 100644 --- a/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py +++ b/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py @@ -251,7 +251,7 @@ def reduce_func(df): # Otherwise, we get mismatching internal and external indices for both axes intermediate_df.index = pandas.RangeIndex(1) intermediate_df.columns = pandas.RangeIndex(1) - self._null_count_cache = intermediate_df.to_pandas().squeeze() + self._null_count_cache = intermediate_df.to_pandas().squeeze(axis=1).item() return self._null_count_cache @property @@ -411,15 +411,18 @@ def _get_validity_buffer(self) -> Tuple[PandasProtocolBuffer, Any]: buf = self._col.to_numpy().flatten() # Determine the encoding for valid values - valid = 1 if invalid == 0 else 0 + valid = invalid == 0 + invalid = not valid - mask = [valid if type(buf[i]) is str else invalid for i in range(buf.size)] + mask = np.empty(shape=(len(buf),), dtype=np.bool_) + for i, obj in enumerate(buf): + mask[i] = valid if isinstance(obj, str) else invalid # Convert the mask array to a Pandas "buffer" using a NumPy array as the backing store - buffer = PandasProtocolBuffer(np.asarray(mask, dtype="uint8")) + buffer = PandasProtocolBuffer(mask) # Define the dtype of the returned buffer - dtype = (DTypeKind.UINT, 8, "C", "=") + dtype = (DTypeKind.BOOL, 8, "b", "=") self._validity_buffer_cache = (buffer, dtype) return self._validity_buffer_cache diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 7dce9702f22..7c03e0a24f0 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -27,7 +27,13 @@ from modin.error_message import ErrorMessage from modin.core.storage_formats.pandas.utils import compute_chunksize from modin.core.dataframe.pandas.utils import concatenate -from modin.config import NPartitions, ProgressBar, BenchmarkMode +from modin.config import ( + NPartitions, + ProgressBar, + BenchmarkMode, + PersistentPickle, + Engine, +) from modin.logging import ClassLogger import os @@ -117,7 +123,20 @@ def preprocess_func(cls, map_func): `map_func` if the `apply` method of the `PandasDataframePartition` object you are using does not require any modification to a given function. """ - return cls._partition_class.preprocess_func(map_func) + old_value = PersistentPickle.get() + # When performing a function with Modin objects, it is more profitable to + # do the conversion to pandas once on the main process than several times + # on worker processes. Details: https://github.com/modin-project/modin/pull/6673/files#r1391086755 + # For Dask, otherwise there may be an error: `coroutine 'Client._gather' was never awaited` + need_update = not PersistentPickle.get() and Engine.get() != "Dask" + if need_update: + PersistentPickle.put(True) + try: + result = cls._partition_class.preprocess_func(map_func) + finally: + if need_update: + PersistentPickle.put(old_value) + return result # END Abstract Methods @@ -530,7 +549,7 @@ def map_partitions(cls, partitions, map_func): @classmethod @wait_computations_if_benchmark_mode - def lazy_map_partitions(cls, partitions, map_func): + def lazy_map_partitions(cls, partitions, map_func, func_args=None): """ Apply `map_func` to every partition in `partitions` *lazily*. @@ -540,6 +559,8 @@ def lazy_map_partitions(cls, partitions, map_func): Partitions of Modin Frame. map_func : callable Function to apply. + func_args : iterable, optional + Positional arguments for the 'map_func'. Returns ------- @@ -549,7 +570,13 @@ def lazy_map_partitions(cls, partitions, map_func): preprocessed_map_func = cls.preprocess_func(map_func) return np.array( [ - [part.add_to_apply_calls(preprocessed_map_func) for part in row] + [ + part.add_to_apply_calls( + preprocessed_map_func, + *(tuple() if func_args is None else func_args), + ) + for part in row + ] for row in partitions ] ) diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py index 78c40853ec1..755e5e6f170 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py @@ -40,3 +40,26 @@ class PandasOnDaskDataframe(PandasDataframe): """ _partition_mgr_cls = PandasOnDaskDataframePartitionManager + + @classmethod + def reconnect(cls, address, attributes): # noqa: GL08 + # The main goal is to configure the client for the worker process + # using the address passed by the custom `__reduce__` function + try: + from distributed import default_client + + default_client() + except ValueError: + from distributed import Client + + # setup `default_client` for worker process + _ = Client(address) + obj = cls.__new__(cls) + obj.__dict__.update(attributes) + return obj + + def __reduce__(self): # noqa: GL08 + from distributed import default_client + + address = default_client().scheduler_info()["address"] + return self.reconnect, (address, self.__dict__) diff --git a/modin/core/execution/ray/common/utils.py b/modin/core/execution/ray/common/utils.py index 845159eb0ae..7c27221a76a 100644 --- a/modin/core/execution/ray/common/utils.py +++ b/modin/core/execution/ray/common/utils.py @@ -287,3 +287,111 @@ def deserialize(obj): # pragma: no cover return dict(zip(obj.keys(), RayWrapper.materialize(list(obj.values())))) else: return obj + + +def deconstruct_call_queue(call_queue): + """ + Deconstruct the passed call queue into a 1D list. + + This is required, so the call queue can be then passed to a Ray's kernel + as a variable-length argument ``kernel(*queue)`` so the Ray engine + automatically materialize all the futures that the queue might have contained. + + Parameters + ---------- + call_queue : list[list[func, args, kwargs], ...] + + Returns + ------- + num_funcs : int + The number of functions in the call queue. + arg_lengths : list of ints + The number of positional arguments for each function in the call queue. + kw_key_lengths : list of ints + The number of key-word arguments for each function in the call queue. + kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool} + Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]` + describes the j-th keyword argument of the i-th function in the call queue. + The describtion contains of the lengths of the argument and whether it's a list at all + (for example, {"len": 1, "was_iterable": False} describes a non-list argument). + unfolded_queue : list + A 1D call queue that can be reconstructed using ``reconstruct_call_queue`` function. + """ + num_funcs = len(call_queue) + arg_lengths = [] + kw_key_lengths = [] + kw_value_lengths = [] + unfolded_queue = [] + for call in call_queue: + unfolded_queue.append(call[0]) + unfolded_queue.extend(call[1]) + arg_lengths.append(len(call[1])) + # unfold keyword dict + ## unfold keys + unfolded_queue.extend(call[2].keys()) + kw_key_lengths.append(len(call[2])) + ## unfold values + value_lengths = [] + for value in call[2].values(): + was_iterable = True + if not isinstance(value, (list, tuple)): + was_iterable = False + value = (value,) + unfolded_queue.extend(value) + value_lengths.append({"len": len(value), "was_iterable": was_iterable}) + kw_value_lengths.append(value_lengths) + + return num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, *unfolded_queue + + +def reconstruct_call_queue( + num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, unfolded_queue +): + """ + Reconstruct original call queue from the result of the ``deconstruct_call_queue()``. + + Parameters + ---------- + num_funcs : int + The number of functions in the call queue. + arg_lengths : list of ints + The number of positional arguments for each function in the call queue. + kw_key_lengths : list of ints + The number of key-word arguments for each function in the call queue. + kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool} + Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]` + describes the j-th keyword argument of the i-th function in the call queue. + The describtion contains of the lengths of the argument and whether it's a list at all + (for example, {"len": 1, "was_iterable": False} describes a non-list argument). + unfolded_queue : list + A 1D call queue that is result of the ``deconstruct_call_queue()`` function. + + Returns + ------- + list[list[func, args, kwargs], ...] + Original call queue. + """ + items_took = 0 + + def take_n_items(n): + nonlocal items_took + res = unfolded_queue[items_took : items_took + n] + items_took += n + return res + + call_queue = [] + for i in range(num_funcs): + func = take_n_items(1)[0] + args = take_n_items(arg_lengths[i]) + kw_keys = take_n_items(kw_key_lengths[i]) + kwargs = {} + value_lengths = kw_value_lengths[i] + for key, value_length in zip(kw_keys, value_lengths): + vals = take_n_items(value_length["len"]) + if value_length["len"] == 1 and not value_length["was_iterable"]: + vals = vals[0] + kwargs[key] = vals + + call_queue.append((func, args, kwargs)) + + return call_queue diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index a7caabf2f9d..7326d755dbb 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -16,7 +16,11 @@ import ray from ray.util import get_node_ip_address -from modin.core.execution.ray.common.utils import deserialize, ObjectIDType +from modin.core.execution.ray.common.utils import ( + ObjectIDType, + deconstruct_call_queue, + reconstruct_call_queue, +) from modin.core.execution.ray.common import RayWrapper from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition from modin.pandas.indexing import compute_sliced_len @@ -97,7 +101,9 @@ def apply(self, func, *args, **kwargs): self._is_debug(log) and log.debug( f"SUBMIT::_apply_list_of_funcs::{self._identity}" ) - result, length, width, ip = _apply_list_of_funcs.remote(call_queue, data) + result, length, width, ip = _apply_list_of_funcs.remote( + data, *deconstruct_call_queue(call_queue) + ) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. @@ -128,7 +134,7 @@ def drain_call_queue(self): new_length, new_width, self._ip_cache, - ) = _apply_list_of_funcs.remote(call_queue, data) + ) = _apply_list_of_funcs.remote(data, *deconstruct_call_queue(call_queue)) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. @@ -391,16 +397,29 @@ def _apply_func(partition, func, *args, **kwargs): # pragma: no cover @ray.remote(num_returns=4) -def _apply_list_of_funcs(call_queue, partition): # pragma: no cover +def _apply_list_of_funcs( + partition, num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, *futures +): # pragma: no cover """ Execute all operations stored in the call queue on the partition in a worker process. Parameters ---------- - call_queue : list - A call queue that needs to be executed on the partition. partition : pandas.DataFrame A pandas DataFrame the call queue needs to be executed on. + num_funcs : int + The number of functions in the call queue. + arg_lengths : list of ints + The number of positional arguments for each function in the call queue. + kw_key_lengths : list of ints + The number of key-word arguments for each function in the call queue. + kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool} + Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]` + describes the j-th keyword argument of the i-th function in the call queue. + The describtion contains of the lengths of the argument and whether it's a list at all + (for example, {"len": 1, "was_iterable": False} describes a non-list argument). + *futures : list + A 1D call queue that is result of the ``deconstruct_call_queue()`` function. Returns ------- @@ -413,10 +432,10 @@ def _apply_list_of_funcs(call_queue, partition): # pragma: no cover str The node IP address of the worker process. """ - for func, f_args, f_kwargs in call_queue: - func = deserialize(func) - args = deserialize(f_args) - kwargs = deserialize(f_kwargs) + call_queue = reconstruct_call_queue( + num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, futures + ) + for func, args, kwargs in call_queue: try: partition = func(partition, *args, **kwargs) # Sometimes Arrow forces us to make a copy of an object before we operate on it. We diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index f81290faf24..816b1b717a7 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -20,8 +20,8 @@ from modin.core.dataframe.pandas.partitioning.axis_partition import ( PandasDataframeAxisPartition, ) -from modin.core.execution.ray.common.utils import deserialize from modin.core.execution.ray.common import RayWrapper + from .partition import PandasOnRayDataframePartition from modin.utils import _inherit_docstrings @@ -115,12 +115,13 @@ def deploy_splitting_func( else num_splits, ).remote( cls._get_deploy_split_func(), - axis, - func, - f_args, - f_kwargs, + *f_args, num_splits, *partitions, + axis=axis, + f_to_deploy=func, + f_len_args=len(f_args), + f_kwargs=f_kwargs, extract_metadata=extract_metadata, ) @@ -176,13 +177,14 @@ def deploy_axis_func( **({"max_retries": max_retries} if max_retries is not None else {}), ).remote( cls._get_deploy_axis_func(), - axis, - func, - f_args, - f_kwargs, + *f_args, num_splits, maintain_partitioning, *partitions, + axis=axis, + f_to_deploy=func, + f_len_args=len(f_args), + f_kwargs=f_kwargs, manual_partition=manual_partition, lengths=lengths, ) @@ -231,14 +233,15 @@ def deploy_func_between_two_axis_partitions( num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN) ).remote( PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions, - axis, - func, - f_args, - f_kwargs, + *f_args, num_splits, len_of_left, other_shape, *partitions, + axis=axis, + f_to_deploy=func, + f_len_args=len(f_args), + f_kwargs=f_kwargs, ) def wait(self): @@ -261,11 +264,11 @@ class PandasOnRayDataframeRowPartition(PandasOnRayDataframeVirtualPartition): @ray.remote def _deploy_ray_func( deployer, + *positional_args, axis, f_to_deploy, - f_args, + f_len_args, f_kwargs, - *args, extract_metadata=True, **kwargs, ): # pragma: no cover @@ -274,29 +277,32 @@ def _deploy_ray_func( This is ALWAYS called on either ``PandasDataframeAxisPartition.deploy_axis_func`` or ``PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions``, which both - serve to deploy another dataframe function on a Ray worker process. The provided ``f_args`` - is thus are deserialized here (on the Ray worker) before the function is called (``f_kwargs`` - will never contain more Ray objects, and thus does not require deserialization). + serve to deploy another dataframe function on a Ray worker process. The provided `positional_args` + contains positional arguments for both: `deployer` and for `f_to_deploy`, the parameters can be separated + using the `f_len_args` value. The parameters are combined so they will be deserialized by Ray before the + kernel is executed (`f_kwargs` will never contain more Ray objects, and thus does not require deserialization). Parameters ---------- deployer : callable A `PandasDataFrameAxisPartition.deploy_*` method that will call ``f_to_deploy``. + *positional_args : list + The first `f_len_args` elements in this list represent positional arguments + to pass to the `f_to_deploy`. The rest are positional arguments that will be + passed to `deployer`. axis : {0, 1} - The axis to perform the function along. + The axis to perform the function along. This argument is keyword only. f_to_deploy : callable or RayObjectID - The function to deploy. - f_args : list or tuple - Positional arguments to pass to ``f_to_deploy``. + The function to deploy. This argument is keyword only. + f_len_args : int + Number of positional arguments to pass to ``f_to_deploy``. This argument is keyword only. f_kwargs : dict - Keyword arguments to pass to ``f_to_deploy``. - *args : list - Positional arguments to pass to ``deployer``. + Keyword arguments to pass to ``f_to_deploy``. This argument is keyword only. extract_metadata : bool, default: True Whether to return metadata (length, width, ip) of the result. Passing `False` may relax the load on object storage as the remote function would return 4 times fewer futures. Passing `False` makes sense for temporary results where you know for sure that the - metadata will never be requested. + metadata will never be requested. This argument is keyword only. **kwargs : dict Keyword arguments to pass to ``deployer``. @@ -309,8 +315,9 @@ def _deploy_ray_func( ----- Ray functions are not detected by codecov (thus pragma: no cover). """ - f_args = deserialize(f_args) - result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs) + f_args = positional_args[:f_len_args] + deploy_args = positional_args[f_len_args:] + result = deployer(axis, f_to_deploy, f_args, f_kwargs, *deploy_args, **kwargs) if not extract_metadata: return result ip = get_node_ip_address() diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py index 66323820506..bc799aa2ce2 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py @@ -40,3 +40,7 @@ class PandasOnUnidistDataframe(PandasDataframe): """ _partition_mgr_cls = PandasOnUnidistDataframePartitionManager + + def support_materialization_in_worker_process(self) -> bool: + # more details why this is not `True` in https://github.com/modin-project/modin/pull/6673 + return False diff --git a/modin/core/io/file_dispatcher.py b/modin/core/io/file_dispatcher.py index e201fd6b324..0d0cd2a0a7f 100644 --- a/modin/core/io/file_dispatcher.py +++ b/modin/core/io/file_dispatcher.py @@ -89,7 +89,7 @@ def __enter__(self): PermissionError, ) except ModuleNotFoundError: - credential_error_type = () + credential_error_type = (PermissionError,) args = (self.file_path, self.mode, self.compression) @@ -257,11 +257,21 @@ def file_exists(cls, file_path, storage_options=None): if not is_fsspec_url(file_path) and not is_url(file_path): return os.path.exists(file_path) - from botocore.exceptions import ( - NoCredentialsError, - EndpointConnectionError, - ConnectTimeoutError, - ) + try: + from botocore.exceptions import ( + NoCredentialsError, + EndpointConnectionError, + ConnectTimeoutError, + ) + + credential_error_type = ( + NoCredentialsError, + PermissionError, + EndpointConnectionError, + ConnectTimeoutError, + ) + except ModuleNotFoundError: + credential_error_type = (PermissionError,) if storage_options is not None: new_storage_options = dict(storage_options) @@ -273,12 +283,7 @@ def file_exists(cls, file_path, storage_options=None): exists = False try: exists = fs.exists(file_path) - except ( - NoCredentialsError, - PermissionError, - EndpointConnectionError, - ConnectTimeoutError, - ): + except credential_error_type: fs, _ = fsspec.core.url_to_fs(file_path, anon=True, **new_storage_options) exists = fs.exists(file_path) diff --git a/modin/core/io/io.py b/modin/core/io/io.py index f79ce04f9b2..30c0e186661 100644 --- a/modin/core/io/io.py +++ b/modin/core/io/io.py @@ -28,6 +28,7 @@ from modin.error_message import ErrorMessage from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler from modin.utils import _inherit_docstrings +from modin.pandas.io import ExcelFile _doc_default_io_method = """ {summary} using pandas. @@ -277,6 +278,12 @@ def read_clipboard(cls, sep=r"\s+", **kwargs): # pragma: no cover # noqa: PR01 ) def read_excel(cls, **kwargs): # noqa: PR01 ErrorMessage.default_to_pandas("`read_excel`") + if isinstance(kwargs["io"], ExcelFile): + # otherwise, Modin objects may be passed to the pandas context, resulting + # in undefined behavior + # for example in the case: pd.read_excel(pd.ExcelFile), since reading from + # pd.ExcelFile in `read_excel` isn't supported + kwargs["io"]._set_pandas_mode() intermediate = pandas.read_excel(**kwargs) if isinstance(intermediate, (OrderedDict, dict)): parsed = type(intermediate)() diff --git a/modin/core/io/text/excel_dispatcher.py b/modin/core/io/text/excel_dispatcher.py index 4fd40b6424f..71a9745910f 100644 --- a/modin/core/io/text/excel_dispatcher.py +++ b/modin/core/io/text/excel_dispatcher.py @@ -13,12 +13,16 @@ """Module houses `ExcelDispatcher` class, that is used for reading excel files.""" +import os +from io import BytesIO + import pandas import re import warnings from modin.core.io.text.text_file_dispatcher import TextFileDispatcher from modin.config import NPartitions +from modin.pandas.io import ExcelFile EXCEL_READ_BLOCK_SIZE = 4096 @@ -55,6 +59,29 @@ def _read(cls, io, **kwargs): **kwargs ) + if kwargs.get("skiprows") is not None: + return cls.single_worker_read( + io, + reason="Modin doesn't support 'skiprows' parameter of `read_excel`", + **kwargs + ) + + if isinstance(io, bytes): + io = BytesIO(io) + + # isinstance(ExcelFile, os.PathLike) == True + if not isinstance(io, (str, os.PathLike, BytesIO)) or isinstance( + io, (ExcelFile, pandas.ExcelFile) + ): + if isinstance(io, ExcelFile): + io._set_pandas_mode() + return cls.single_worker_read( + io, + reason="Modin only implements parallel `read_excel` the following types of `io`: " + + "str, os.PathLike, io.BytesIO.", + **kwargs + ) + from zipfile import ZipFile from openpyxl.worksheet.worksheet import Worksheet from openpyxl.worksheet._reader import WorksheetReader @@ -79,7 +106,7 @@ def _read(cls, io, **kwargs): # NOTE: ExcelReader() in read-only mode does not close file handle by itself # work around that by passing file object if we received some path - io_file = open(io, "rb") if isinstance(io, str) else io + io_file = open(io, "rb") if isinstance(io, (str, os.PathLike)) else io try: ex = ExcelReader(io_file, read_only=True) ex.read() @@ -90,14 +117,12 @@ def _read(cls, io, **kwargs): ex.read_strings() ws = Worksheet(wb) finally: - if isinstance(io, str): + if isinstance(io, (str, os.PathLike)): # close only if it were us who opened the object io_file.close() pandas_kw = dict(kwargs) # preserve original kwargs with ZipFile(io) as z: - from io import BytesIO - # Convert index to sheet name in file if isinstance(sheet_name, int): sheet_name = "sheet{}".format(sheet_name + 1) diff --git a/modin/core/io/text/text_file_dispatcher.py b/modin/core/io/text/text_file_dispatcher.py index 158212b37ec..75f05356bb1 100644 --- a/modin/core/io/text/text_file_dispatcher.py +++ b/modin/core/io/text/text_file_dispatcher.py @@ -683,6 +683,9 @@ def check_parameters_support( if read_kwargs["chunksize"] is not None: return (False, "`chunksize` parameter is not supported") + if read_kwargs.get("iterator"): + return (False, "`iterator==True` parameter is not supported") + if read_kwargs.get("dialect") is not None: return (False, "`dialect` parameter is not supported") diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index 828dea2f9a1..0f1ecc66b8b 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -313,6 +313,16 @@ def finalize(self): """Finalize constructing the dataframe calling all deferred functions which were used to build it.""" pass + def support_materialization_in_worker_process(self) -> bool: + """ + Whether it's possible to call function `to_pandas` during the pickling process, at the moment of recreating the object. + + Returns + ------- + bool + """ + return self._modin_frame.support_materialization_in_worker_process() + # END Data Management Methods # To/From Pandas @@ -2270,6 +2280,25 @@ def nsmallest(self, n=5, columns=None, keep="first"): self, n=n, columns=columns, keep=keep ) + @doc_utils.add_refer_to("DataFrame.query") + def rowwise_query(self, expr, **kwargs): + """ + Query columns of the QueryCompiler with a boolean expression row-wise. + + Parameters + ---------- + expr : str + **kwargs : dict + + Returns + ------- + BaseQueryCompiler + New QueryCompiler containing the rows where the boolean expression is satisfied. + """ + raise NotImplementedError( + "Row-wise queries execution is not implemented for the selected backend." + ) + @doc_utils.add_refer_to("DataFrame.eval") def eval(self, expr, **kwargs): """ diff --git a/modin/core/storage_formats/pandas/parsers.py b/modin/core/storage_formats/pandas/parsers.py index 3305eeb213a..ef298608d7e 100644 --- a/modin/core/storage_formats/pandas/parsers.py +++ b/modin/core/storage_formats/pandas/parsers.py @@ -580,7 +580,6 @@ def parse(fname, **kwargs): num_splits = kwargs.pop("num_splits", None) start = kwargs.pop("start", None) end = kwargs.pop("end", None) - _skiprows = kwargs.pop("skiprows") excel_header = kwargs.get("_header") sheet_name = kwargs.get("sheet_name", 0) footer = b"" @@ -589,6 +588,8 @@ def parse(fname, **kwargs): if start is None or end is None: return pandas.read_excel(fname, **kwargs) + _skiprows = kwargs.pop("skiprows") + from zipfile import ZipFile import openpyxl from openpyxl.worksheet._reader import WorksheetReader @@ -651,7 +652,7 @@ def update_row_nums(match): bytesio = BytesIO(excel_header + bytes_data + footer) # Use openpyxl to read/parse sheet data common_args = (ws, bytesio, ex.shared_strings, False) - if PandasExcelParser.need_rich_text_param: + if PandasExcelParser.need_rich_text_param(): reader = WorksheetReader(*common_args, rich_text=False) else: reader = WorksheetReader(*common_args) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 5787c3071b8..efb29680269 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -18,6 +18,7 @@ queries for the ``PandasDataframe``. """ +import ast import re import numpy as np import pandas @@ -3073,6 +3074,65 @@ def _list_like_func(self, func, axis, *args, **kwargs): ) return self.__constructor__(new_modin_frame) + def rowwise_query(self, expr, **kwargs): + """ + Query the columns of a ``PandasQueryCompiler`` with a boolean row-wise expression. + + Basically, in row-wise expressions we only allow column names, constants + and other variables captured using the '@' symbol. No function/method + cannot be called inside such expressions. + + Parameters + ---------- + expr : str + Row-wise boolean expression. + **kwargs : dict + Arguments to pass to the ``pandas.DataFrame.query()``. + + Returns + ------- + PandasQueryCompiler + + Raises + ------ + NotImplementedError + In case the passed expression cannot be executed row-wise. + """ + # Walk through the AST and verify it doesn't contain any nodes that + # prevent us from executing the query row-wise (we're basically + # looking for 'ast.Call') + nodes = ast.parse(expr.replace("@", "")).body + is_row_wise_query = True + + while nodes: + node = nodes.pop() + if isinstance(node, ast.Expr): + node = getattr(node, "value", node) + + if isinstance(node, ast.UnaryOp): + nodes.append(node.operand) + elif isinstance(node, ast.BinOp): + nodes.extend([node.left, node.right]) + elif isinstance(node, ast.BoolOp): + nodes.extend(node.values) + elif isinstance(node, ast.Compare): + nodes.extend([node.left] + node.comparators) + elif isinstance(node, (ast.Name, ast.Constant)): + pass + else: + # if we end up here then the expression is no longer simple + # enough to run it row-wise, so exiting + is_row_wise_query = False + break + + if not is_row_wise_query: + raise NotImplementedError("A non row-wise query was passed.") + + def query_builder(df, **modin_internal_kwargs): + return df.query(expr, inplace=False, **kwargs, **modin_internal_kwargs) + + return self.__constructor__(self._modin_frame.filter(1, query_builder)) + def _callable_func(self, func, axis, *args, **kwargs): """ Apply passed function to each row/column. @@ -3488,6 +3548,10 @@ def agg_func(grp, *args, **kwargs): axis=axis, by=by, operator=lambda grp: agg_func(grp, *agg_args, **agg_kwargs), + # UDFs passed to '.apply()' are allowed to produce results with arbitrary shapes, + # that's why we have to align the partition's shapes/labeling across different + # row partitions + align_result_columns=how == "group_wise", **groupby_kwargs, ) result_qc = self.__constructor__(result) @@ -4125,7 +4189,16 @@ def iloc_mut(partition, row_internal_indices, col_internal_indices, item): Partition data with updated values. """ partition = partition.copy() - partition.iloc[row_internal_indices, col_internal_indices] = item + try: + partition.iloc[row_internal_indices, col_internal_indices] = item + except ValueError: + # `copy` is needed to avoid "ValueError: buffer source array is read-only" for `item` + # because the item may be converted to the type that is in the dataframe. + # TODO: in the future we will need to convert to the correct type manually according + # to the following warning. Example: "FutureWarning: Setting an item of incompatible + # dtype is deprecated and will raise in a future error of pandas. Value '[1.38629436]' + # has dtype incompatible with int64, please explicitly cast to a compatible dtype first." + partition.iloc[row_internal_indices, col_internal_indices] = item.copy() return partition new_modin_frame = self._modin_frame.apply_select_indices( diff --git a/modin/distributed/dataframe/pandas/partitions.py b/modin/distributed/dataframe/pandas/partitions.py index aa61b024b1d..1366826c6bf 100644 --- a/modin/distributed/dataframe/pandas/partitions.py +++ b/modin/distributed/dataframe/pandas/partitions.py @@ -23,18 +23,30 @@ if TYPE_CHECKING: from modin.core.execution.ray.implementations.pandas_on_ray.partitioning import ( PandasOnRayDataframePartition, + PandasOnRayDataframeColumnPartition, + PandasOnRayDataframeRowPartition, ) from modin.core.execution.dask.implementations.pandas_on_dask.partitioning import ( PandasOnDaskDataframePartition, + PandasOnDaskDataframeColumnPartition, + PandasOnDaskDataframeRowPartition, ) - from modin.core.execution.unidist.implementations.pandas_on_unidist.partitioning.partition import ( + from modin.core.execution.unidist.implementations.pandas_on_unidist.partitioning import ( PandasOnUnidistDataframePartition, + PandasOnUnidistDataframeColumnPartition, + PandasOnUnidistDataframeRowPartition, ) PartitionUnionType = Union[ PandasOnRayDataframePartition, PandasOnDaskDataframePartition, PandasOnUnidistDataframePartition, + PandasOnRayDataframeColumnPartition, + PandasOnRayDataframeRowPartition, + PandasOnDaskDataframeColumnPartition, + PandasOnDaskDataframeRowPartition, + PandasOnUnidistDataframeColumnPartition, + PandasOnUnidistDataframeRowPartition, ] else: from typing import Any @@ -85,7 +97,10 @@ def _unwrap_partitions() -> list: [p.drain_call_queue() for p in modin_frame._partitions.flatten()] def get_block(partition: PartitionUnionType) -> np.ndarray: - blocks = partition.list_of_blocks + if hasattr(partition, "force_materialization"): + blocks = partition.force_materialization().list_of_blocks + else: + blocks = partition.list_of_blocks assert ( len(blocks) == 1 ), f"Implementation assumes that partition contains a single block, but {len(blocks)} recieved." @@ -109,6 +124,12 @@ def get_block(partition: PartitionUnionType) -> np.ndarray: "PandasOnRayDataframePartition", "PandasOnDaskDataframePartition", "PandasOnUnidistDataframePartition", + "PandasOnRayDataframeColumnPartition", + "PandasOnRayDataframeRowPartition", + "PandasOnDaskDataframeColumnPartition", + "PandasOnDaskDataframeRowPartition", + "PandasOnUnidistDataframeColumnPartition", + "PandasOnUnidistDataframeRowPartition", ): return _unwrap_partitions() raise ValueError( diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_algebra.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_algebra.py index 8929890d11c..633878c2e25 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_algebra.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_algebra.py @@ -146,14 +146,19 @@ def __init__(self, relOp): self.relOp = relOp @classmethod - def reset_id(cls): + def reset_id(cls, next_id=0): """ - Reset ID to be used for the next new node to 0. + Reset ID to be used for the next new node to `next_id`. Can be used to have a zero-based numbering for each generated query. + + Parameters + ---------- + next_id : int, default: 0 + Next node id. """ - cls._next_id[0] = 0 + cls._next_id[0] = next_id class CalciteScanNode(CalciteBaseNode): diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py index bf7237d7db3..ecb3bb1d97d 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py @@ -45,7 +45,7 @@ ) from collections import abc -from pandas.core.dtypes.common import get_dtype, is_bool_dtype +from pandas.core.dtypes.common import get_dtype, is_categorical_dtype, is_bool_dtype class CalciteBuilder: @@ -752,8 +752,10 @@ def _push(self, node): and all(isinstance(expr, CalciteInputRefExpr) for expr in node.exprs) ): # Replace the last CalciteProjectionNode with this one and - # translate the input refs. - exprs = self.res.pop().exprs + # translate the input refs. The `id` attribute is preserved. + last = self.res.pop() + exprs = last.exprs + last.reset_id(int(last.id)) node = CalciteProjectionNode( node.fields, [exprs[expr.input] for expr in node.exprs] ) @@ -903,7 +905,10 @@ def _process_groupby(self, op): bool_cols := { c: cast_agg[agg_exprs[c].agg] for c, t in frame.dtypes.items() - if is_bool_dtype(t) and agg_exprs[c].agg in cast_agg + # Do not call is_bool_dtype() for categorical since it checks all the categories + if not is_categorical_dtype(t) + and is_bool_dtype(t) + and agg_exprs[c].agg in cast_agg } ): trans = self._input_ctx()._maybe_copy_and_translate_expr diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/io/io.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/io/io.py index dd102edbc30..d1c5c156896 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/io/io.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/io/io.py @@ -292,7 +292,7 @@ def _dtype_to_arrow(cls, dtype): tname = dtype if isinstance(dtype, str) else dtype.name if tname == "category": return pa.dictionary(index_type=pa.int32(), value_type=pa.string()) - elif tname == "string": + elif tname == "string" or tname == "object": return pa.string() else: return pa.from_numpy_dtype(tname) diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py index 9653df62477..f2ed946dd76 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/test/test_dataframe.py @@ -31,6 +31,8 @@ from .utils import eval_io, ForceHdkImport, set_execution_mode, run_and_compare from pandas.core.dtypes.common import is_list_like +from pyhdk import __version__ as hdk_version + StorageFormat.put("hdk") import modin.pandas as pd @@ -394,6 +396,19 @@ def test(df, lib, **kwargs): run_and_compare(test, data={}) + def test_read_csv_dtype_object(self): + with pytest.warns(UserWarning) as warns: + with ensure_clean(".csv") as file: + with open(file, "w") as f: + f.write("test\ntest") + + def test(**kwargs): + return pd.read_csv(file, dtype={"test": "object"}) + + run_and_compare(test, data={}) + for warn in warns.list: + assert not re.match(r".*defaulting to pandas.*", str(warn)) + class TestMasks: data = { @@ -2064,19 +2079,21 @@ def test_cat_codes(self): class TestSort: + # In order for the row order to be deterministic after sorting, + # the `by` columns should not contain duplicate values. data = { - "a": [1, 2, 5, 2, 5, 4, 4, 5, 2], + "a": [1, 2, 5, -2, -5, 4, -4, 6, 3], "b": [1, 2, 3, 6, 5, 1, 4, 5, 3], "c": [5, 4, 2, 3, 1, 1, 4, 5, 6], "d": ["1", "4", "3", "2", "1", "6", "7", "5", "0"], } data_nulls = { - "a": [1, 2, 5, 2, 5, 4, 4, None, 2], + "a": [1, 2, 5, -2, -5, 4, -4, None, 3], "b": [1, 2, 3, 6, 5, None, 4, 5, 3], "c": [None, 4, 2, 3, 1, 1, 4, 5, 6], } data_multiple_nulls = { - "a": [1, 2, None, 2, 5, 4, 4, None, 2], + "a": [1, 2, None, -2, 5, 4, -4, None, 3], "b": [1, 2, 3, 6, 5, None, 4, 5, None], "c": [None, 4, 2, None, 1, 1, 4, 5, 6], } @@ -2094,6 +2111,7 @@ def test_sort_cols(self, cols, ignore_index, index_cols, ascending): def sort(df, cols, ignore_index, index_cols, ascending, **kwargs): if index_cols: df = df.set_index(index_cols) + df_equals_with_non_stable_indices() return df.sort_values(cols, ignore_index=ignore_index, ascending=ascending) run_and_compare( @@ -2119,6 +2137,10 @@ def sort(df, ascending, **kwargs): ascending=ascending, ) + @pytest.mark.skipif( + hdk_version == "0.7.0", + reason="https://github.com/modin-project/modin/issues/6514", + ) @pytest.mark.parametrize("ascending", ascending_values) def test_sort_cols_str(self, ascending): def sort(df, ascending, **kwargs): diff --git a/modin/experimental/core/io/text/csv_glob_dispatcher.py b/modin/experimental/core/io/text/csv_glob_dispatcher.py index 88531efe43c..7de275ee846 100644 --- a/modin/experimental/core/io/text/csv_glob_dispatcher.py +++ b/modin/experimental/core/io/text/csv_glob_dispatcher.py @@ -282,11 +282,21 @@ def file_exists(cls, file_path: str, storage_options=None) -> bool: if not is_fsspec_url(file_path): return len(glob.glob(file_path)) > 0 - from botocore.exceptions import ( - NoCredentialsError, - EndpointConnectionError, - ConnectTimeoutError, - ) + try: + from botocore.exceptions import ( + NoCredentialsError, + EndpointConnectionError, + ConnectTimeoutError, + ) + + credential_error_type = ( + NoCredentialsError, + PermissionError, + EndpointConnectionError, + ConnectTimeoutError, + ) + except ModuleNotFoundError: + credential_error_type = (PermissionError,) if storage_options is not None: new_storage_options = dict(storage_options) @@ -298,12 +308,7 @@ def file_exists(cls, file_path: str, storage_options=None) -> bool: exists = False try: exists = fs.exists(file_path) - except ( - NoCredentialsError, - PermissionError, - EndpointConnectionError, - ConnectTimeoutError, - ): + except credential_error_type: fs, _ = fsspec.core.url_to_fs(file_path, anon=True, **new_storage_options) exists = fs.exists(file_path) return exists or len(fs.glob(file_path)) > 0 @@ -328,14 +333,31 @@ def get_path(cls, file_path: str) -> list: abs_paths = [os.path.abspath(path) for path in relative_paths] return abs_paths - from botocore.exceptions import ( - NoCredentialsError, - EndpointConnectionError, - ConnectTimeoutError, - ) + try: + from botocore.exceptions import ( + NoCredentialsError, + EndpointConnectionError, + ConnectTimeoutError, + ) + + credential_error_type = ( + NoCredentialsError, + PermissionError, + EndpointConnectionError, + ConnectTimeoutError, + ) + except ModuleNotFoundError: + credential_error_type = (PermissionError,) def get_file_path(fs_handle) -> List[str]: - file_paths = fs_handle.glob(file_path) + if "*" in file_path: + file_paths = fs_handle.glob(file_path) + else: + file_paths = [ + f + for f in fs_handle.find(file_path) + if not f.endswith("/") # exclude folder + ] if len(file_paths) == 0 and not fs_handle.exists(file_path): raise FileNotFoundError(f"Path <{file_path}> isn't available.") fs_addresses = [fs_handle.unstrip_protocol(path) for path in file_paths] @@ -344,12 +366,7 @@ def get_file_path(fs_handle) -> List[str]: fs, _ = fsspec.core.url_to_fs(file_path) try: return get_file_path(fs) - except ( - NoCredentialsError, - PermissionError, - EndpointConnectionError, - ConnectTimeoutError, - ): + except credential_error_type: fs, _ = fsspec.core.url_to_fs(file_path, anon=True) return get_file_path(fs) diff --git a/modin/experimental/core/storage_formats/hdk/query_compiler.py b/modin/experimental/core/storage_formats/hdk/query_compiler.py index 6fb82148d7c..7e2c449e93c 100644 --- a/modin/experimental/core/storage_formats/hdk/query_compiler.py +++ b/modin/experimental/core/storage_formats/hdk/query_compiler.py @@ -181,6 +181,9 @@ def finalize(self): # TODO: implement this for HDK storage format raise NotImplementedError() + def support_materialization_in_worker_process(self) -> bool: + return True + def to_pandas(self): return self._modin_frame.to_pandas() @@ -396,7 +399,7 @@ def min(self, **kwargs): return self._agg("min", **kwargs) def sum(self, **kwargs): - min_count = kwargs.pop("min_count") + min_count = kwargs.pop("min_count", 0) if min_count != 0: raise NotImplementedError( f"HDK's sum does not support such set of parameters: min_count={min_count}." diff --git a/modin/experimental/sql/hdk/__init__.py b/modin/experimental/sql/hdk/__init__.py new file mode 100644 index 00000000000..31de5addb64 --- /dev/null +++ b/modin/experimental/sql/hdk/__init__.py @@ -0,0 +1,14 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +"""Implementation of HDK SQL functionality.""" diff --git a/modin/numpy/test/utils.py b/modin/numpy/test/utils.py index 5e378f4ede7..8b32ae6dac4 100644 --- a/modin/numpy/test/utils.py +++ b/modin/numpy/test/utils.py @@ -16,7 +16,7 @@ import modin.numpy as np -def assert_scalar_or_array_equal(x1, x2, err_msg=None): +def assert_scalar_or_array_equal(x1, x2, err_msg=""): """ Assert whether the result of the numpy and modin computations are the same. diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index fb3cf666b88..f62c2ae03ff 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -14,6 +14,7 @@ """Module houses ``DataFrame`` class, that is distributed version of ``pandas.DataFrame``.""" from __future__ import annotations + import pandas from pandas.core.common import apply_if_callable, get_cython_func from pandas.core.computation.eval import _check_engine @@ -35,6 +36,7 @@ ) import datetime +import os import re import itertools import functools @@ -1123,7 +1125,7 @@ def join( if isinstance(other, Series): if other.name is None: raise ValueError("Other Series must have a name") - other = self.__constructor__({other.name: other}) + other = self.__constructor__(other) if on is not None: return self.__constructor__( query_compiler=self._query_compiler.join( @@ -1562,10 +1564,23 @@ def query(self, expr, inplace=False, **kwargs): # noqa: PR01, RT01, D200 Query the columns of a ``DataFrame`` with a boolean expression. """ self._update_var_dicts_in_kwargs(expr, kwargs) + self._validate_eval_query(expr, **kwargs) inplace = validate_bool_kwarg(inplace, "inplace") - new_query_compiler = pandas.DataFrame.query( - self, expr, inplace=False, **kwargs - )._query_compiler + # HACK: this condition kind of breaks the idea of backend agnostic API as all queries + # _should_ work fine for all of the engines using `pandas.DataFrame.query(...)` approach. + # However, at this point we know that we can execute simple queries way more efficiently + # using the QC's API directly in case of pandas backend. Ideally, we have to make it work + # with the 'pandas.query' approach the same as good the direct QC call is. But investigating + # and fixing the root cause of the perf difference appears to be much more complicated + # than putting this hack here. Hopefully, we'll get rid of it soon: + # https://github.com/modin-project/modin/issues/6499 + try: + new_query_compiler = self._query_compiler.rowwise_query(expr, **kwargs) + except NotImplementedError: + # a non row-wise query was passed, falling back to pandas implementation + new_query_compiler = pandas.DataFrame.query( + self, expr, inplace=False, **kwargs + )._query_compiler return self._create_or_update_from_compiler(new_query_compiler, inplace) def rename( @@ -2965,7 +2980,7 @@ def _getitem(self, key): # Persistance support methods - BEGIN @classmethod - def _inflate_light(cls, query_compiler): + def _inflate_light(cls, query_compiler, source_pid): """ Re-creates the object from previously-serialized lightweight representation. @@ -2975,16 +2990,23 @@ def _inflate_light(cls, query_compiler): ---------- query_compiler : BaseQueryCompiler Query compiler to use for object re-creation. + source_pid : int + Determines whether a Modin or pandas object needs to be created. + Modin objects are created only on the main process. Returns ------- DataFrame New ``DataFrame`` based on the `query_compiler`. """ + if os.getpid() != source_pid: + return query_compiler.to_pandas() + # The current logic does not involve creating Modin objects + # and manipulation with them in worker processes return cls(query_compiler=query_compiler) @classmethod - def _inflate_full(cls, pandas_df): + def _inflate_full(cls, pandas_df, source_pid): """ Re-creates the object from previously-serialized disk-storable representation. @@ -2992,18 +3014,29 @@ def _inflate_full(cls, pandas_df): ---------- pandas_df : pandas.DataFrame Data to use for object re-creation. + source_pid : int + Determines whether a Modin or pandas object needs to be created. + Modin objects are created only on the main process. Returns ------- DataFrame New ``DataFrame`` based on the `pandas_df`. """ + if os.getpid() != source_pid: + return pandas_df + # The current logic does not involve creating Modin objects + # and manipulation with them in worker processes return cls(data=from_pandas(pandas_df)) def __reduce__(self): self._query_compiler.finalize() - if PersistentPickle.get(): - return self._inflate_full, (self._to_pandas(),) - return self._inflate_light, (self._query_compiler,) + pid = os.getpid() + if ( + PersistentPickle.get() + or not self._query_compiler.support_materialization_in_worker_process() + ): + return self._inflate_full, (self._to_pandas(), pid) + return self._inflate_light, (self._query_compiler, pid) # Persistance support methods - END diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 0732d43db54..28c429cabc2 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -18,7 +18,12 @@ from pandas.core.apply import reconstruct_func from pandas.errors import SpecificationError import pandas.core.groupby -from pandas.core.dtypes.common import is_list_like, is_numeric_dtype, is_integer +from pandas.core.dtypes.common import ( + is_datetime64_any_dtype, + is_integer, + is_list_like, + is_numeric_dtype, +) from pandas._libs.lib import no_default import pandas.core.common as com from types import BuiltinFunctionType @@ -37,7 +42,6 @@ from modin.pandas.utils import cast_function_modin2pandas from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler from modin.core.dataframe.algebra.default2pandas.groupby import GroupBy -from modin.config import IsExperimental from .series import Series from .window import RollingGroupby from .utils import is_label @@ -182,31 +186,19 @@ def __getattr__(self, key): return self.__getitem__(key) raise err - # TODO: `.__getattribute__` overriding is broken in experimental mode. We should - # remove this branching one it's fixed: - # https://github.com/modin-project/modin/issues/5536 - if not IsExperimental.get(): - - def __getattribute__(self, item): - attr = super().__getattribute__(item) - if ( - item not in _DEFAULT_BEHAVIOUR - and not self._query_compiler.lazy_execution - ): - # We default to pandas on empty DataFrames. This avoids a large amount of - # pain in underlying implementation and returns a result immediately rather - # than dealing with the edge cases that empty DataFrames have. - if ( - callable(attr) - and self._df.empty - and hasattr(self._pandas_class, item) - ): + def __getattribute__(self, item): + attr = super().__getattribute__(item) + if item not in _DEFAULT_BEHAVIOUR and not self._query_compiler.lazy_execution: + # We default to pandas on empty DataFrames. This avoids a large amount of + # pain in underlying implementation and returns a result immediately rather + # than dealing with the edge cases that empty DataFrames have. + if callable(attr) and self._df.empty and hasattr(self._pandas_class, item): - def default_handler(*args, **kwargs): - return self._default_to_pandas(item, *args, **kwargs) + def default_handler(*args, **kwargs): + return self._default_to_pandas(item, *args, **kwargs) - return default_handler - return attr + return default_handler + return attr @property def ngroups(self): @@ -573,16 +565,20 @@ def apply(self, func, *args, **kwargs): if not isinstance(func, BuiltinFunctionType): func = wrap_udf_function(func) - return self._check_index( - self._wrap_aggregation( - qc_method=type(self._query_compiler).groupby_agg, - numeric_only=False, - agg_func=func, - agg_args=args, - agg_kwargs=kwargs, - how="group_wise", - ) + apply_res = self._wrap_aggregation( + qc_method=type(self._query_compiler).groupby_agg, + numeric_only=False, + agg_func=func, + agg_args=args, + agg_kwargs=kwargs, + how="group_wise", ) + reduced_index = pandas.Index([MODIN_UNNAMED_SERIES_LABEL]) + if not isinstance(apply_res, Series) and apply_res.columns.equals( + reduced_index + ): + apply_res = apply_res.squeeze(axis=1) + return self._check_index(apply_res) @property def dtypes(self): @@ -1280,7 +1276,9 @@ def diff(self, periods=1, axis=0): for col, dtype in self._df.dtypes.items(): # can't calculate diff on non-numeric columns, so check for non-numeric # columns that are not included in the `by` - if not is_numeric_dtype(dtype) and not ( + if not ( + is_numeric_dtype(dtype) or is_datetime64_any_dtype(dtype) + ) and not ( isinstance(self._by, BaseQueryCompiler) and col in self._by.columns ): raise TypeError(f"unsupported operand type for -: got {dtype}") @@ -1683,10 +1681,9 @@ def _try_get_str_func(self, fn): Returns ------- str, list - If `fn` is a callable, return its name if it's a method of the groupby - object, otherwise return `fn` itself. If `fn` is a string, return it. - If `fn` is an Iterable, return a list of _try_get_str_func applied to - each element of `fn`. + If `fn` is a callable, return its name, otherwise return `fn` itself. + If `fn` is a string, return it. If `fn` is an Iterable, return a list + of _try_get_str_func applied to each element of `fn`. """ if not isinstance(fn, str) and isinstance(fn, Iterable): return [self._try_get_str_func(f) for f in fn] @@ -1696,7 +1693,7 @@ def _try_get_str_func(self, fn): elif fn is np.min: # np.min is called "amin", so it's not a method of the groupby object. return "amin" - return fn.__name__ if callable(fn) and fn.__name__ in dir(self) else fn + return fn.__name__ if callable(fn) else fn def value_counts( self, diff --git a/modin/pandas/io.py b/modin/pandas/io.py index 8373305d258..62fa75390ef 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -835,10 +835,21 @@ class ExcelFile(ClassLogger, pandas.ExcelFile): # noqa: PR01, D200 Class for parsing tabular excel sheets into DataFrame objects. """ + _behave_like_pandas = False + + def _set_pandas_mode(self): # noqa + # disable Modin behavior to be able to pass object to `pandas.read_excel` + # otherwise, Modin objects may be passed to the pandas context, resulting + # in undefined behavior + self._behave_like_pandas = True + def __getattribute__(self, item): + if item in ["_set_pandas_mode", "_behave_like_pandas"]: + return object.__getattribute__(self, item) + default_behaviors = ["__init__", "__class__"] method = super(ExcelFile, self).__getattribute__(item) - if item not in default_behaviors: + if not self._behave_like_pandas and item not in default_behaviors: if callable(method): def return_handler(*args, **kwargs): diff --git a/modin/pandas/series.py b/modin/pandas/series.py index 45b5316cead..aba2781d47a 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -14,6 +14,7 @@ """Module houses `Series` class, that is distributed version of `pandas.Series`.""" from __future__ import annotations + import numpy as np import pandas from pandas.io.formats.info import SeriesInfo @@ -29,6 +30,7 @@ from pandas._libs.lib import no_default, NoDefault from pandas._typing import IndexKeyFunc, Axis from typing import Union, Optional, Hashable, TYPE_CHECKING, IO +import os import warnings from modin.logging import disable_logging @@ -686,9 +688,9 @@ def between(self, left, right, inclusive: str = "both"): # noqa: PR01, RT01, D2 """ Return boolean Series equivalent to left <= series <= right. """ - return self.__constructor__( - query_compiler=self._query_compiler.between(left, right, inclusive) - ) + # 'pandas.Series.between()' only uses public Series' API, + # so passing a Modin Series there is safe + return pandas.Series.between(self, left, right, inclusive) def combine(self, other, func, fill_value=None): # noqa: PR01, RT01, D200 """ @@ -2401,7 +2403,7 @@ def _repartition(self): # Persistance support methods - BEGIN @classmethod - def _inflate_light(cls, query_compiler, name): + def _inflate_light(cls, query_compiler, name, source_pid): """ Re-creates the object from previously-serialized lightweight representation. @@ -2413,16 +2415,23 @@ def _inflate_light(cls, query_compiler, name): Query compiler to use for object re-creation. name : str The name to give to the new object. + source_pid : int + Determines whether a Modin or pandas object needs to be created. + Modin objects are created only on the main process. Returns ------- Series New Series based on the `query_compiler`. """ + if os.getpid() != source_pid: + return query_compiler.to_pandas() + # The current logic does not involve creating Modin objects + # and manipulation with them in worker processes return cls(query_compiler=query_compiler, name=name) @classmethod - def _inflate_full(cls, pandas_series): + def _inflate_full(cls, pandas_series, source_pid): """ Re-creates the object from previously-serialized disk-storable representation. @@ -2430,18 +2439,29 @@ def _inflate_full(cls, pandas_series): ---------- pandas_series : pandas.Series Data to use for object re-creation. + source_pid : int + Determines whether a Modin or pandas object needs to be created. + Modin objects are created only on the main process. Returns ------- Series New Series based on the `pandas_series`. """ + if os.getpid() != source_pid: + return pandas_series + # The current logic does not involve creating Modin objects + # and manipulation with them in worker processes return cls(data=pandas_series) def __reduce__(self): self._query_compiler.finalize() - if PersistentPickle.get(): - return self._inflate_full, (self._to_pandas(),) - return self._inflate_light, (self._query_compiler, self.name) + pid = os.getpid() + if ( + PersistentPickle.get() + or not self._query_compiler.support_materialization_in_worker_process() + ): + return self._inflate_full, (self._to_pandas(), pid) + return self._inflate_light, (self._query_compiler, self.name, pid) # Persistance support methods - END diff --git a/modin/pandas/test/dataframe/test_join_sort.py b/modin/pandas/test/dataframe/test_join_sort.py index 970bf98717f..348dccb7f43 100644 --- a/modin/pandas/test/dataframe/test_join_sort.py +++ b/modin/pandas/test/dataframe/test_join_sort.py @@ -11,6 +11,8 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. +import warnings + import pytest import numpy as np import pandas @@ -181,6 +183,26 @@ def test_join_5203(): dfs[0].join([dfs[1], dfs[2]], how="inner", on="a") +def test_join_6602(): + abbreviations = pd.Series( + ["Major League Baseball", "National Basketball Association"], + index=["MLB", "NBA"], + ) + teams = pd.DataFrame( + { + "name": ["Mariners", "Lakers"] * 50, + "league_abbreviation": ["MLB", "NBA"] * 50, + } + ) + + with warnings.catch_warnings(): + # check that join doesn't show UserWarning + warnings.filterwarnings( + "error", "Distributing object", category=UserWarning + ) + teams.set_index("league_abbreviation").join(abbreviations.rename("league_name")) + + @pytest.mark.parametrize( "test_data, test_data2", [ diff --git a/modin/pandas/test/dataframe/test_pickle.py b/modin/pandas/test/dataframe/test_pickle.py index 755c1775a8f..c58306784d9 100644 --- a/modin/pandas/test/dataframe/test_pickle.py +++ b/modin/pandas/test/dataframe/test_pickle.py @@ -17,8 +17,7 @@ import modin.pandas as pd from modin.config import PersistentPickle - -from modin.pandas.test.utils import df_equals +from modin.pandas.test.utils import create_test_dfs, df_equals @pytest.fixture @@ -47,6 +46,33 @@ def test_dataframe_pickle(modin_df, persistent): df_equals(modin_df, other) +def test__reduce__(): + # `DataFrame.__reduce__` will be called implicitly when lambda expressions are + # pre-processed for the distributed engine. + dataframe_data = ["Major League Baseball", "National Basketball Association"] + abbr_md, abbr_pd = create_test_dfs(dataframe_data, index=["MLB", "NBA"]) + # breakpoint() + + dataframe_data = { + "name": ["Mariners", "Lakers"] * 500, + "league_abbreviation": ["MLB", "NBA"] * 500, + } + teams_md, teams_pd = create_test_dfs(dataframe_data) + + result_md = ( + teams_md.set_index("name") + .league_abbreviation.apply(lambda abbr: abbr_md[0].loc[abbr]) + .rename("league") + ) + + result_pd = ( + teams_pd.set_index("name") + .league_abbreviation.apply(lambda abbr: abbr_pd[0].loc[abbr]) + .rename("league") + ) + df_equals(result_md, result_pd) + + def test_column_pickle(modin_column, modin_df, persistent): dmp = pickle.dumps(modin_column) other = pickle.loads(dmp) diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index be3eb4866d8..ca40bec96a0 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -19,7 +19,7 @@ import datetime from modin.config import StorageFormat -from modin.config.envvars import ExperimentalGroupbyImpl +from modin.config.envvars import ExperimentalGroupbyImpl, IsRayCluster from modin.core.dataframe.pandas.partitioning.axis_partition import ( PandasDataframeAxisPartition, ) @@ -1082,6 +1082,24 @@ def test_series_groupby(by, as_index_series_or_dataframe): eval_groups(modin_groupby, pandas_groupby) +def test_agg_udf_6600(): + data = { + "name": ["Mariners", "Lakers"] * 50, + "league_abbreviation": ["MLB", "NBA"] * 50, + } + modin_teams, pandas_teams = create_test_dfs(data) + + def my_first_item(s): + return s.iloc[0] + + for agg in (my_first_item, [my_first_item], ["nunique", my_first_item]): + eval_general( + modin_teams, + pandas_teams, + operation=lambda df: df.groupby("league_abbreviation").name.agg(agg), + ) + + def test_multi_column_groupby(): pandas_df = pandas.DataFrame( { @@ -2653,6 +2671,21 @@ def test_groupby_pct_change_diff_6194(): ) +def test_groupby_datetime_diff_6628(): + dates = pd.date_range(start="2023-01-01", periods=10, freq="W") + df = pd.DataFrame( + { + "date": dates, + "group": "A", + } + ) + eval_general( + df, + df._to_pandas(), + lambda df: df.groupby("group").diff(), + ) + + def eval_rolling(md_window, pd_window): eval_general(md_window, pd_window, lambda window: window.count()) eval_general(md_window, pd_window, lambda window: window.sum()) @@ -2783,3 +2816,105 @@ def perform(lib): return getattr(grp, func)() eval_general(pd, pandas, perform) + + +# there are two different implementations of partitions aligning for cluster and non-cluster mode, +# here we want to test both of them, so simply modifying the config for this test +@pytest.mark.parametrize( + "modify_config", + [ + {ExperimentalGroupbyImpl: True, IsRayCluster: True}, + {ExperimentalGroupbyImpl: True, IsRayCluster: False}, + ], + indirect=True, +) +def test_shape_changing_udf(modify_config): + modin_df, pandas_df = create_test_dfs( + { + "by_col1": ([1] * 50) + ([10] * 50), + "col2": np.arange(100), + "col3": np.arange(100), + } + ) + + def func1(group): + # changes the original shape and indexing of the 'group' + return pandas.Series( + [1, 2, 3, 4], index=["new_col1", "new_col2", "new_col4", "new_col3"] + ) + + eval_general( + modin_df.groupby("by_col1"), + pandas_df.groupby("by_col1"), + lambda df: df.apply(func1), + ) + + def func2(group): + # each group have different shape at the end + # (we do .to_frame().T as otherwise this scenario doesn't work in pandas) + if group.iloc[0, 0] == 1: + return ( + pandas.Series( + [1, 2, 3, 4], index=["new_col1", "new_col2", "new_col4", "new_col3"] + ) + .to_frame() + .T + ) + return ( + pandas.Series([20, 33, 44], index=["new_col2", "new_col3", "new_col4"]) + .to_frame() + .T + ) + + eval_general( + modin_df.groupby("by_col1"), + pandas_df.groupby("by_col1"), + lambda df: df.apply(func2), + ) + + def func3(group): + # one of the groups produce an empty dataframe, in the result we should + # have joined columns of both of these dataframes + if group.iloc[0, 0] == 1: + return pandas.DataFrame([[1, 2, 3]], index=["col1", "col2", "col3"]) + return pandas.DataFrame(columns=["col2", "col3", "col4", "col5"]) + + eval_general( + modin_df.groupby("by_col1"), + pandas_df.groupby("by_col1"), + lambda df: df.apply(func3), + ) + + +@pytest.mark.parametrize( + "modify_config", [{ExperimentalGroupbyImpl: True}], indirect=True +) +def test_reshuffling_groupby_on_strings(modify_config): + # reproducer from https://github.com/modin-project/modin/issues/6509 + modin_df, pandas_df = create_test_dfs( + {"col1": ["a"] * 50 + ["b"] * 50, "col2": range(100)} + ) + + modin_df = modin_df.astype({"col1": "string"}) + pandas_df = pandas_df.astype({"col1": "string"}) + + eval_general( + modin_df.groupby("col1"), pandas_df.groupby("col1"), lambda grp: grp.mean() + ) + + +@pytest.mark.parametrize( + "modify_config", [{ExperimentalGroupbyImpl: True}], indirect=True +) +def test_groupby_apply_series_result(modify_config): + # reproducer from the issue: + # https://github.com/modin-project/modin/issues/6632 + df = pd.DataFrame( + np.random.randint(5, 10, size=5), index=[f"s{i+1}" for i in range(5)] + ) + df["group"] = [1, 1, 2, 2, 3] + + # res = df.groupby('group').apply(lambda x: x.name+2) + eval_general( + df, df._to_pandas(), lambda df: df.groupby("group").apply(lambda x: x.name + 2) + ) diff --git a/modin/pandas/test/test_io.py b/modin/pandas/test/test_io.py index 0649f604746..633afe2b637 100644 --- a/modin/pandas/test/test_io.py +++ b/modin/pandas/test/test_io.py @@ -43,6 +43,7 @@ from modin.test.test_utils import warns_that_defaulting_to_pandas import pyarrow as pa import os +from io import BytesIO, StringIO from scipy import sparse import sys import sqlalchemy as sa @@ -611,6 +612,16 @@ def test_read_csv_iteration(self, iterator): df_equals(modin_df, pd_df) + # Tests #6553 + if iterator: + rdf_reader = pd.read_csv(filename, iterator=iterator) + pd_reader = pandas.read_csv(filename, iterator=iterator) + + modin_df = rdf_reader.read() + pd_df = pd_reader.read() + + df_equals(modin_df, pd_df) + def test_read_csv_encoding_976(self): file_name = "modin/pandas/test/data/issue_976.csv" names = [str(i) for i in range(11)] @@ -1005,8 +1016,6 @@ def test_read_csv_default_to_pandas(self): warning_suffix = "" with warns_that_defaulting_to_pandas(suffix=warning_suffix): # This tests that we default to pandas on a buffer - from io import StringIO - with open(pytest.csvs_names["test_read_csv_regular"], "r") as _f: pd.read_csv(StringIO(_f.read())) @@ -1880,11 +1889,24 @@ def test_read_json_metadata(self, make_json_file): @pytest.mark.filterwarnings(default_to_pandas_ignore_string) class TestExcel: @check_file_leaks - def test_read_excel(self, make_excel_file): + @pytest.mark.parametrize("pathlike", [False, True]) + def test_read_excel(self, pathlike, make_excel_file): + unique_filename = make_excel_file() + eval_io( + fn_name="read_excel", + # read_excel kwargs + io=Path(unique_filename) if pathlike else unique_filename, + ) + + @check_file_leaks + @pytest.mark.parametrize("skiprows", [2, [1, 3], lambda x: x in [0, 2]]) + def test_read_excel_skiprows(self, skiprows, make_excel_file): eval_io( fn_name="read_excel", # read_excel kwargs io=make_excel_file(), + skiprows=skiprows, + check_kwargs_callable=False, ) @check_file_leaks @@ -2021,11 +2043,54 @@ def test_ExcelFile(self, make_excel_file): try: df_equals(modin_excel_file.parse(), pandas_excel_file.parse()) assert modin_excel_file.io == unique_filename - assert isinstance(modin_excel_file, pd.ExcelFile) finally: modin_excel_file.close() pandas_excel_file.close() + def test_ExcelFile_bytes(self, make_excel_file): + unique_filename = make_excel_file() + with open(unique_filename, mode="rb") as f: + content = f.read() + + modin_excel_file = pd.ExcelFile(content) + pandas_excel_file = pandas.ExcelFile(content) + + df_equals(modin_excel_file.parse(), pandas_excel_file.parse()) + + def test_read_excel_ExcelFile(self, make_excel_file): + unique_filename = make_excel_file() + with open(unique_filename, mode="rb") as f: + content = f.read() + + modin_excel_file = pd.ExcelFile(content) + pandas_excel_file = pandas.ExcelFile(content) + + df_equals(pd.read_excel(modin_excel_file), pandas.read_excel(pandas_excel_file)) + + @pytest.mark.parametrize("use_bytes_io", [False, True]) + def test_read_excel_bytes(self, use_bytes_io, make_excel_file): + unique_filename = make_excel_file() + with open(unique_filename, mode="rb") as f: + io_bytes = f.read() + + if use_bytes_io: + io_bytes = BytesIO(io_bytes) + + eval_io( + fn_name="read_excel", + # read_excel kwargs + io=io_bytes, + ) + + def test_read_excel_file_handle(self, make_excel_file): + unique_filename = make_excel_file() + with open(unique_filename, mode="rb") as f: + eval_io( + fn_name="read_excel", + # read_excel kwargs + io=f, + ) + @pytest.mark.xfail(strict=False, reason="Flaky test, defaults to pandas") def test_to_excel(self, tmp_path): modin_df, pandas_df = create_test_dfs(TEST_DATA) diff --git a/modin/pandas/test/test_series.py b/modin/pandas/test/test_series.py index 0fe8ad944f6..d3c3ce5a06c 100644 --- a/modin/pandas/test/test_series.py +++ b/modin/pandas/test/test_series.py @@ -64,6 +64,7 @@ axis_values, bool_arg_keys, bool_arg_values, + create_test_dfs, int_arg_keys, int_arg_values, encoding_types, @@ -1232,13 +1233,14 @@ def test_array(data): eval_general(modin_series, pandas_series, lambda df: df.array) -@pytest.mark.xfail(reason="Using pandas Series.") @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) def test_between(data): - modin_series = create_test_series(data) + modin_series, pandas_series = create_test_series(data) - with pytest.raises(NotImplementedError): - modin_series.between(None, None) + df_equals( + modin_series.between(1, 4), + pandas_series.between(1, 4), + ) def test_between_time(): @@ -4833,3 +4835,29 @@ def test_binary_numpy_universal_function_issue_6483(): *create_test_series(test_data["float_nan_data"]), lambda series: np.arctan2(series, np.sin(series)), ) + + +def test__reduce__(): + # `Series.__reduce__` will be called implicitly when lambda expressions are + # pre-processed for the distributed engine. + series_data = ["Major League Baseball", "National Basketball Association"] + abbr_md, abbr_pd = create_test_series(series_data, index=["MLB", "NBA"]) + + dataframe_data = { + "name": ["Mariners", "Lakers"] * 500, + "league_abbreviation": ["MLB", "NBA"] * 500, + } + teams_md, teams_pd = create_test_dfs(dataframe_data) + + result_md = ( + teams_md.set_index("name") + .league_abbreviation.apply(lambda abbr: abbr_md.loc[abbr]) + .rename("league") + ) + + result_pd = ( + teams_pd.set_index("name") + .league_abbreviation.apply(lambda abbr: abbr_pd.loc[abbr]) + .rename("league") + ) + df_equals(result_md, result_pd) diff --git a/modin/test/interchange/dataframe_protocol/pandas/test_protocol.py b/modin/test/interchange/dataframe_protocol/pandas/test_protocol.py index b723ad5884b..356b9bec24d 100644 --- a/modin/test/interchange/dataframe_protocol/pandas/test_protocol.py +++ b/modin/test/interchange/dataframe_protocol/pandas/test_protocol.py @@ -13,6 +13,8 @@ """Dataframe exchange protocol tests that are specific for pandas storage format implementation.""" +import pandas + import modin.pandas as pd from modin.pandas.utils import from_dataframe from modin.pandas.test.utils import df_equals, test_data @@ -54,3 +56,9 @@ def test_categorical_from_dataframe(): {"foo": pd.Series(["0", "1", "2", "3", "0", "3", "2", "3"], dtype="category")} ) eval_df_protocol(modin_df) + + +def test_interchange_with_pandas_string(): + modin_df = pd.DataFrame({"fips": ["01001"]}) + pandas_df = pandas.api.interchange.from_dataframe(modin_df.__dataframe__()) + df_equals(modin_df, pandas_df) diff --git a/modin/test/interchange/dataframe_protocol/test_general.py b/modin/test/interchange/dataframe_protocol/test_general.py index 5a1aa6f9236..b4373835431 100644 --- a/modin/test/interchange/dataframe_protocol/test_general.py +++ b/modin/test/interchange/dataframe_protocol/test_general.py @@ -82,6 +82,14 @@ def test_na_float(df_from_dict): assert colX.null_count == 1 +def test_null_count(df_from_dict): + df = df_from_dict({"foo": [42]}) + dfX = df.__dataframe__() + colX = dfX.get_column_by_name("foo") + null_count = colX.null_count + assert null_count == 0 and type(null_count) is int + + def test_noncategorical(df_from_dict): df = df_from_dict({"a": [1, 2, 3]}) dfX = df.__dataframe__() diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index 154a2490a5e..157e6ab819c 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -117,35 +117,30 @@ def construct_modin_df_by_scheme(pandas_df, partitioning_scheme): return md_df -@pytest.fixture -def modify_config(request): - values = request.param - old_values = {} - - for key, value in values.items(): - old_values[key] = key.get() - key.put(value) - - yield # waiting for the test to be completed - # restoring old parameters - for key, value in old_values.items(): - key.put(value) +def validate_partitions_cache(df, axis=None): + """ + Assert that the ``PandasDataframe`` shape caches correspond to the actual partition's shapes. + Parameters + ---------- + df : PandasDataframe + axis : int, optional + An axis to verify the cache for. If not specified, verify cache for both of the axes. + """ + axis = [0, 1] if axis is None else [axis] -def validate_partitions_cache(df): - """Assert that the ``PandasDataframe`` shape caches correspond to the actual partition's shapes.""" - row_lengths = df._row_lengths_cache - column_widths = df._column_widths_cache + axis_lengths = [df._row_lengths_cache, df._column_widths_cache] - assert row_lengths is not None - assert column_widths is not None - assert df._partitions.shape[0] == len(row_lengths) - assert df._partitions.shape[1] == len(column_widths) + for ax in axis: + assert axis_lengths[ax] is not None + assert df._partitions.shape[ax] == len(axis_lengths[ax]) for i in range(df._partitions.shape[0]): for j in range(df._partitions.shape[1]): - assert df._partitions[i, j].length() == row_lengths[i] - assert df._partitions[i, j].width() == column_widths[j] + if 0 in axis: + assert df._partitions[i, j].length() == axis_lengths[0][i] + if 1 in axis: + assert df._partitions[i, j].width() == axis_lengths[1][j] def test_aligning_blocks(): @@ -1094,3 +1089,194 @@ def test_reindex_preserve_dtypes(kwargs): reindexed_df = df.reindex(**kwargs) assert reindexed_df._query_compiler._modin_frame.has_materialized_dtypes + + +def test_query_dispatching(): + """ + Test whether the logic of determining whether the passed query + can be performed row-wise works correctly in ``PandasQueryCompiler.rowwise_query()``. + + The tested method raises a ``NotImpementedError`` if the query cannot be performed row-wise + and raises nothing if it can. + """ + qc = pd.DataFrame( + {"a": [1], "b": [2], "c": [3], "d": [4], "e": [5]} + )._query_compiler + + local_var = 10 # noqa: F841 (unused variable) + + # these queries should be performed row-wise (so no exception) + qc.rowwise_query("a < 1") + qc.rowwise_query("a < b") + qc.rowwise_query("a < (b + @local_var) * c > 10") + + # these queries cannot be performed row-wise (so they must raise an exception) + with pytest.raises(NotImplementedError): + qc.rowwise_query("a < b[0]") + with pytest.raises(NotImplementedError): + qc.rowwise_query("a < b.min()") + with pytest.raises(NotImplementedError): + qc.rowwise_query("a < (b + @local_var + (b - e.min())) * c > 10") + with pytest.raises(NotImplementedError): + qc.rowwise_query("a < b.size") + + +def test_sort_values_cache(): + """ + Test that the column widths cache after ``.sort_values()`` is valid: + https://github.com/modin-project/modin/issues/6607 + """ + # 1 row partition and 2 column partitions, in this case '.sort_values()' will use + # row-wise implementation and so the column widths WILL NOT be changed + modin_df = construct_modin_df_by_scheme( + pandas.DataFrame({f"col{i}": range(100) for i in range(64)}), + partitioning_scheme={"row_lengths": [100], "column_widths": [32, 32]}, + ) + mf_initial = modin_df._query_compiler._modin_frame + + mf_res = modin_df.sort_values("col0")._query_compiler._modin_frame + # check that row-wise implementation was indeed used (col widths were not changed) + assert mf_res._column_widths_cache == [32, 32] + # check that the cache and actual col widths match + validate_partitions_cache(mf_res, axis=1) + # check that the initial frame's cache wasn't changed + assert mf_initial._column_widths_cache == [32, 32] + validate_partitions_cache(mf_initial, axis=1) + + # 2 row partition and 2 column partitions, in this case '.sort_values()' will use + # range-partitioning implementation and so the column widths WILL be changed + modin_df = construct_modin_df_by_scheme( + pandas.DataFrame({f"col{i}": range(100) for i in range(64)}), + partitioning_scheme={"row_lengths": [50, 50], "column_widths": [32, 32]}, + ) + mf_initial = modin_df._query_compiler._modin_frame + + mf_res = modin_df.sort_values("col0")._query_compiler._modin_frame + # check that range-partitioning implementation was indeed used (col widths were changed) + assert mf_res._column_widths_cache == [64] + # check that the cache and actual col widths match + validate_partitions_cache(mf_res, axis=1) + # check that the initial frame's cache wasn't changed + assert mf_initial._column_widths_cache == [32, 32] + validate_partitions_cache(mf_initial, axis=1) + + +class DummyFuture: + """ + A dummy object emulating future's behaviour, this class is used in ``test_call_queue_serialization``. + + It stores a random numeric value representing its data and `was_materialized` state. + Initially this object is considered to be serialized, the state can be changed by calling + the ``.materialize()`` method. + """ + + def __init__(self): + self._value = np.random.randint(0, 1_000_000) + self._was_materialized = False + + def materialize(self): + self._was_materialized = True + return self + + def __eq__(self, other): + if isinstance(other, type(self)) and self._value == other._value: + return True + return False + + +@pytest.mark.parametrize( + "call_queue", + [ + # empty call queue + [], + # a single-function call queue (the function has no argument and it's materialized) + [(0, [], {})], + # a single-function call queue (the function has no argument and it's serialized) + [(DummyFuture(), [], {})], + # a multiple-functions call queue, none of the functions have arguments + [(DummyFuture(), [], {}), (DummyFuture(), [], {}), (0, [], {})], + # a single-function call queue (the function has both positional and keyword arguments) + [ + ( + DummyFuture(), + [DummyFuture()], + { + "a": DummyFuture(), + "b": [DummyFuture()], + "c": [DummyFuture, DummyFuture()], + }, + ) + ], + # a multiple-functions call queue with mixed types of functions/arguments + [ + ( + DummyFuture(), + [1, DummyFuture(), DummyFuture(), [4, 5]], + {"a": [DummyFuture(), 2], "b": DummyFuture(), "c": [1]}, + ), + (0, [], {}), + (0, [1], {}), + (0, [DummyFuture(), DummyFuture()], {}), + ], + ], +) +def test_call_queue_serialization(call_queue): + """ + Test that the process of passing a call queue to Ray's kernel works correctly. + + Before passing a call queue to the kernel that actually executes it, the call queue + is unwrapped into a 1D list using the ``deconstruct_call_queue`` function. After that, + the 1D list is passed as a variable length argument to the kernel ``kernel(*queue)``, + this is done so the Ray engine automatically materialize all the futures that the queue + might have contained. In the end, inside of the kernel, the ``reconstruct_call_queue`` function + is called to rebuild the call queue into its original structure. + + This test emulates the described flow and verifies that it works properly. + """ + from modin.core.execution.ray.implementations.pandas_on_ray.partitioning.partition import ( + deconstruct_call_queue, + reconstruct_call_queue, + ) + + def materialize_queue(*values): + """ + Walk over the `values` and materialize all the future types. + + This function emulates how Ray remote functions materialize their positional arguments. + """ + return [ + val.materialize() if isinstance(val, DummyFuture) else val for val in values + ] + + def assert_everything_materialized(queue): + """Walk over the call queue and verify that all entities there are materialized.""" + + def assert_materialized(obj): + assert ( + isinstance(obj, DummyFuture) and obj._was_materialized + ) or not isinstance(obj, DummyFuture) + + for func, args, kwargs in queue: + assert_materialized(func) + for arg in args: + assert_materialized(arg) + for value in kwargs.values(): + if not isinstance(value, (list, tuple)): + value = [value] + for val in value: + assert_materialized(val) + + ( + num_funcs, + arg_lengths, + kw_key_lengths, + kw_value_lengths, + *queue, + ) = deconstruct_call_queue(call_queue) + queue = materialize_queue(*queue) + reconstructed_queue = reconstruct_call_queue( + num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, queue + ) + + assert call_queue == reconstructed_queue + assert_everything_materialized(reconstructed_queue) diff --git a/modin/test/test_partition_api.py b/modin/test/test_partition_api.py index a90440aa8fe..64bd0ef8167 100644 --- a/modin/test/test_partition_api.py +++ b/modin/test/test_partition_api.py @@ -114,6 +114,28 @@ def get_df(lib, data): ) +def test_unwrap_virtual_partitions(): + # see #5164 for details + data = test_data["int_data"] + df = pd.DataFrame(data) + virtual_partitioned_df = pd.concat([df] * 10) + actual_partitions = np.array(unwrap_partitions(virtual_partitioned_df, axis=None)) + expected_df = pd.concat([pd.DataFrame(data)] * 10) + expected_partitions = expected_df._query_compiler._modin_frame._partitions + assert expected_partitions.shape == actual_partitions.shape + + for row_idx in range(expected_partitions.shape[0]): + for col_idx in range(expected_partitions.shape[1]): + df_equals( + get_func( + expected_partitions[row_idx][col_idx] + .force_materialization() + .list_of_blocks[0] + ), + get_func(actual_partitions[row_idx][col_idx]), + ) + + @pytest.mark.parametrize("column_widths", [None, "column_widths"]) @pytest.mark.parametrize("row_lengths", [None, "row_lengths"]) @pytest.mark.parametrize("columns", [None, "columns"]) diff --git a/requirements/env_unidist.yml b/requirements/env_unidist.yml index eb5d098e06b..5a9351327cd 100644 --- a/requirements/env_unidist.yml +++ b/requirements/env_unidist.yml @@ -7,7 +7,7 @@ dependencies: # required dependencies - pandas>=2,<2.1 - numpy>=1.20.3 - - unidist-mpi>=0.2.1 + - unidist-mpi>=0.2.1,<=0.4.1 - fsspec>=2021.07.0 - packaging>=21.0 - psutil>=5.8.0 diff --git a/setup.py b/setup.py index 596e2f5fe44..a37c747223c 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ # ray==2.5.0 broken: https://github.com/conda-forge/ray-packages-feedstock/issues/100 # pydantic<2: https://github.com/modin-project/modin/issues/6336 ray_deps = ["ray[default]>=1.13.0,!=2.5.0", "pyarrow>=7.0.0", "pydantic<2"] -unidist_deps = ["unidist[mpi]>=0.2.1"] +unidist_deps = ["unidist[mpi]>=0.2.1,<=0.4.1"] spreadsheet_deps = ["modin-spreadsheet>=0.1.0"] sql_deps = ["dfsql>=0.4.2", "pyparsing<=2.4.7"] all_deps = dask_deps + ray_deps + unidist_deps + spreadsheet_deps