From 068453d354a6ea21230bbe5afd715d0ab2d4d125 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Thu, 19 Oct 2023 14:06:09 +0000 Subject: [PATCH] FEAT-#5836: Introduce 'partial' dtypes cache Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/algebra/binary.py | 7 +- .../dataframe/pandas/dataframe/dataframe.py | 100 ++- .../dataframe/pandas/metadata/__init__.py | 4 +- .../core/dataframe/pandas/metadata/dtypes.py | 652 +++++++++++++++++- modin/core/dataframe/pandas/metadata/index.py | 16 + .../storage_formats/base/query_compiler.py | 10 + .../storage_formats/pandas/query_compiler.py | 7 +- .../hdk_on_native/dataframe/dataframe.py | 6 +- modin/pandas/dataframe.py | 5 +- .../storage_formats/pandas/test_internals.py | 319 ++++++++- setup.cfg | 2 +- 11 files changed, 1081 insertions(+), 47 deletions(-) diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index 1dd263167be..5bbfd42dae6 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -245,9 +245,10 @@ def try_compute_new_dtypes(first, second, infer_dtypes=None, result_dtype=None, elif infer_dtypes == "common_cast": dtypes = maybe_compute_dtypes_common_cast(first, second, axis=axis) elif infer_dtypes == "float": - dtypes = maybe_compute_dtypes_common_cast(first, second, axis=axis) - if dtypes is not None: - dtypes = dtypes.apply(coerce_int_to_float64) + dtypes = maybe_build_dtypes_series(first, second, dtype=np.dtype(float)) + # dtypes = maybe_compute_dtypes_common_cast(first, second, axis=axis) + # if dtypes is not None: + # dtypes = dtypes.apply(coerce_int_to_float64) else: # For now we only know how to handle `result_dtype == bool` as that's # the only value that is being passed here right now, it's unclear diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index fcdc75de27b..e09a1b55b15 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -36,6 +36,7 @@ lazy_metadata_decorator, ) from modin.core.dataframe.pandas.metadata import ( + DtypesDescriptor, LazyProxyCategoricalDtype, ModinDtypes, ModinIndex, @@ -314,14 +315,19 @@ def _maybe_update_proxies(self, dtypes, new_parent=None): new_parent : object, optional A new parent to link the proxies to. If not specified will consider the `self` to be a new parent. + + Returns + ------- + pandas.Series, ModinDtypes or callable """ new_parent = new_parent or self - if isinstance(dtypes, pandas.Series) or ( - isinstance(dtypes, ModinDtypes) and dtypes.is_materialized - ): + if isinstance(dtypes, ModinDtypes): + dtypes = dtypes.maybe_specify_new_frame_ref(new_parent) + if isinstance(dtypes, pandas.Series): for key, value in dtypes.items(): if isinstance(value, LazyProxyCategoricalDtype): dtypes[key] = value._update_proxy(new_parent, column_name=key) + return dtypes def set_dtypes_cache(self, dtypes): """ @@ -329,10 +335,19 @@ def set_dtypes_cache(self, dtypes): Parameters ---------- - dtypes : pandas.Series, ModinDtypes or callable - """ - self._maybe_update_proxies(dtypes) - if isinstance(dtypes, ModinDtypes) or dtypes is None: + dtypes : pandas.Series, ModinDtypes, callable or None + """ + dtypes = self._maybe_update_proxies(dtypes) + if dtypes is None and self.has_materialized_columns: + try: + self._dtypes = ModinDtypes( + DtypesDescriptor( + cols_with_unknown_dtypes=self.columns.tolist(), parent_df=self + ) + ) + except NotImplementedError: + self._dtypes = None + elif isinstance(dtypes, ModinDtypes) or dtypes is None: self._dtypes = dtypes else: self._dtypes = ModinDtypes(dtypes) @@ -354,6 +369,18 @@ def dtypes(self): self.set_dtypes_cache(dtypes) return dtypes + def get_dtypes_set(self): + """ + Get a set of dtypes that are in this dataframe. + + Returns + ------- + set + """ + if isinstance(self._dtypes, ModinDtypes): + return self._dtypes.get_dtypes_set() + return set(self.dtypes.values) + def _compute_dtypes(self, columns=None): """ Compute the data types via TreeReduce pattern for the specified columns. @@ -376,7 +403,13 @@ def dtype_builder(df): if columns is not None: # Sorting positions to request columns in the order they're stored (it's more efficient) numeric_indices = sorted(self.columns.get_indexer_for(columns)) - obj = self._take_2d_positional(col_positions=numeric_indices) + if any(pos < 0 for pos in numeric_indices): + raise KeyError( + f"Some of the columns are not in index: subset={columns}; columns={self.columns}" + ) + obj = self.take_2d_labels_or_positional( + col_labels=self.columns[numeric_indices].tolist() + ) else: obj = self @@ -675,8 +708,11 @@ def _set_columns(self, new_columns): ): return new_columns = self._validate_set_axis(new_columns, self._columns_cache) - if self.has_materialized_dtypes: - self.dtypes.index = new_columns + if isinstance(self._dtypes, ModinDtypes): + new_value = self._dtypes.set_index(new_columns) + self.set_dtypes_cache(new_value) + elif isinstance(self._dtypes, pandas.Series): + self.dtypes.index = new_columns self.set_columns_cache(new_columns) self.synchronize_labels(axis=1) @@ -1146,6 +1182,13 @@ def _take_2d_positional( if self.has_materialized_dtypes: new_dtypes = self.dtypes.iloc[monotonic_col_idx] + elif isinstance(self._dtypes, ModinDtypes): + try: + new_dtypes = self._dtypes.lazy_get( + monotonic_col_idx, numeric_index=True + ) + except NotImplementedError: + new_dtypes = None else: new_dtypes = None else: @@ -1441,6 +1484,11 @@ def _reorder_labels(self, row_positions=None, col_positions=None): col_idx = self.columns[col_positions] if self.has_materialized_dtypes: new_dtypes = self.dtypes.iloc[col_positions] + elif isinstance(self._dtypes, ModinDtypes): + try: + new_dtypes = self._dtypes.lazy_get(col_idx) + except NotImplementedError: + new_dtypes = None if len(col_idx) != len(self.columns): # The frame was re-partitioned along the 1 axis during reordering using @@ -3253,22 +3301,24 @@ def broadcast_apply_full_axis( kw = {"row_lengths": None, "column_widths": None} if isinstance(dtypes, str) and dtypes == "copy": kw["dtypes"] = self.copy_dtypes_cache() + elif isinstance(dtypes, DtypesDescriptor): + kw["dtypes"] = ModinDtypes(dtypes) elif dtypes is not None: if isinstance(dtypes, (pandas.Series, ModinDtypes)): kw["dtypes"] = dtypes.copy() else: if new_columns is None: - ( - new_columns, - kw["column_widths"], - ) = self._compute_axis_labels_and_lengths(1, new_partitions) - kw["dtypes"] = ( - pandas.Series(dtypes, index=new_columns) - if is_list_like(dtypes) - else pandas.Series( - [np.dtype(dtypes)] * len(new_columns), index=new_columns + kw["dtypes"] = ModinDtypes( + DtypesDescriptor(remaining_dtype=np.dtype(dtypes)) + ) + else: + kw["dtypes"] = ( + pandas.Series(dtypes, index=new_columns) + if is_list_like(dtypes) + else pandas.Series( + [np.dtype(dtypes)] * len(new_columns), index=new_columns + ) ) - ) if not keep_partitioning: if kw["row_lengths"] is None and new_index is not None: @@ -3662,10 +3712,12 @@ def _compute_new_widths(): if all(obj.has_materialized_columns for obj in (self, *others)): new_columns = self.columns.append([other.columns for other in others]) new_index = joined_index - if self.has_materialized_dtypes and all( - o.has_materialized_dtypes for o in others - ): - new_dtypes = pandas.concat([self.dtypes] + [o.dtypes for o in others]) + try: + new_dtypes = ModinDtypes.concat( + [self.copy_dtypes_cache()] + [o.copy_dtypes_cache() for o in others] + ) + except NotImplementedError: + new_dtypes = None # If we have already cached the width of each column in at least one # of the column's partitions, we can build new_widths for the new # frame. Typically, if we know the width for any partition in a diff --git a/modin/core/dataframe/pandas/metadata/__init__.py b/modin/core/dataframe/pandas/metadata/__init__.py index 1836a0d5ffa..4caf23833a6 100644 --- a/modin/core/dataframe/pandas/metadata/__init__.py +++ b/modin/core/dataframe/pandas/metadata/__init__.py @@ -13,7 +13,7 @@ """Utilities and classes to handle work with metadata.""" -from .dtypes import LazyProxyCategoricalDtype, ModinDtypes +from .dtypes import DtypesDescriptor, LazyProxyCategoricalDtype, ModinDtypes from .index import ModinIndex -__all__ = ["ModinDtypes", "ModinIndex", "LazyProxyCategoricalDtype"] +__all__ = ["ModinDtypes", "ModinIndex", "LazyProxyCategoricalDtype", "DtypesDescriptor"] diff --git a/modin/core/dataframe/pandas/metadata/dtypes.py b/modin/core/dataframe/pandas/metadata/dtypes.py index 4cd0463f630..59fab2f32fe 100644 --- a/modin/core/dataframe/pandas/metadata/dtypes.py +++ b/modin/core/dataframe/pandas/metadata/dtypes.py @@ -12,13 +12,521 @@ # governing permissions and limitations under the License. """Module contains class ``ModinDtypes``.""" -from typing import Union +from typing import TYPE_CHECKING, Callable, Optional, Union + +import numpy as np import pandas +from pandas._typing import IndexLabel + +if TYPE_CHECKING: + from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe + from .index import ModinIndex from modin.error_message import ErrorMessage +class DtypesDescriptor: + """ + Describes partial dtypes. + + Parameters + ---------- + known_dtypes : dict[IndexLabel, np.dtype], optional + Columns that we know dtypes for. + cols_with_unknown_dtypes : list[IndexLabel], optional + Column names that have unknown dtypes. If specified together with `remaining_dtype`, must describe all + columns with unknown dtypes, otherwise, the missing columns will be assigned to `remaining_dtype`. + If `cols_with_unknown_dtypes` is incomplete, you must specify `know_all_names=False`. + remaining_dtype : np.dtype, optional + Dtype for columns that are not present neither in `known_dtypes` nor in `cols_with_unknown_dtypes`. + This parameter is intended to describe columns that we known dtypes for, but don't know their + names yet. Note, that this parameter DOESN'T describe dtypes for columns from `cols_with_unknown_dtypes`. + parent_df : PandasDataframe, optional + Dataframe object for which we describe dtypes. This dataframe will be used to compute + missing dtypes on ``.materialize()``. + columns_order : dict[int, IndexLabel], optional + Order of columns in the dataframe. If specified, must describe all the columns of the dataframe. + know_all_names : bool, default: True + Whether `known_dtypes` and `cols_with_unknown_dtypes` contain all column names for this dataframe besides those, + that are being described by `remaining_dtype`. + One can't pass `know_all_names=False` together with `remaining_dtype` as this creates ambiguity + on how to interpret missing columns (whether they belong to `remaining_dtype` or not). + _schema_is_known : bool, optional + Whether `known_dtypes` describe all the columns in the dataframe. This parameter intended mostly + for internal use. + """ + + def __init__( + self, + known_dtypes: Optional[dict[IndexLabel, np.dtype]] = None, + cols_with_unknown_dtypes: Optional[list[IndexLabel]] = None, + remaining_dtype: Optional[np.dtype] = None, + parent_df: Optional["PandasDataframe"] = None, + columns_order: Optional[dict[int, IndexLabel]] = None, + know_all_names=True, + _schema_is_known: Optional[bool] = None, + ): + if not know_all_names and remaining_dtype is not None: + raise RuntimeError( + "It's not allowed to pass 'remaining_dtype' and 'know_all_names=False' at the same time." + ) + # columns with known dtypes + self._known_dtypes: dict[IndexLabel, np.dtype] = ( + {} if known_dtypes is None else dict(known_dtypes) + ) + if known_dtypes is not None and len(self._known_dtypes) != len(known_dtypes): + raise NotImplementedError( + "Duplicated column names are not yet supported by DtypesDescriptor" + ) + # columns with unknown dtypes (they're not described by 'remaining_dtype') + if cols_with_unknown_dtypes is not None and len( + set(cols_with_unknown_dtypes) + ) != len(cols_with_unknown_dtypes): + raise NotImplementedError( + "Duplicated column names are not yet supported by DtypesDescriptor" + ) + self._cols_with_unknown_dtypes: list[IndexLabel] = ( + [] if cols_with_unknown_dtypes is None else cols_with_unknown_dtypes + ) + # whether 'known_dtypes' describe all the columns in the dataframe + if _schema_is_known is None: + self._schema_is_known: bool = ( + len(cols_with_unknown_dtypes) == 0 + if ( + # if 'cols_with_unknown_dtypes' was explicitly specified as an empty list and + # we don't have any 'remaining_dtype', then we assume that 'known_dtypes' are complete + cols_with_unknown_dtypes is not None + and know_all_names + and remaining_dtype is None + and len(self._known_dtypes) > 0 + ) + else False + ) + else: + self._schema_is_known: bool = _schema_is_known + self._know_all_names = know_all_names + # a common dtype for columns that are not present in 'known_dtypes' nor in 'cols_with_unknown_dtypes' + self._remaining_dtype: Optional[np.dtype] = remaining_dtype + self._parent_df: Optional["PandasDataframe"] = parent_df + if columns_order is None: + self._columns_order: Optional[dict[int, IndexLabel]] = None + # try to compute '._columns_order' using 'parent_df' + self.columns_order + else: + if remaining_dtype is not None: + raise RuntimeError( + "Passing 'columns_order' and 'remaining_dtype' is ambiguous. You have to manually " + + "complete 'known_dtypes' using the information from 'columns_order' and 'remaining_dtype'." + ) + elif not self._know_all_names: + raise RuntimeError( + "Passing 'columns_order' and 'know_all_names=False' is ambiguous. You have to manually " + + "complete 'cols_with_unknown_dtypes' using the information from 'columns_order' " + + "and pass 'know_all_names=True'." + ) + elif len(columns_order) != ( + len(self._cols_with_unknown_dtypes) + len(self._known_dtypes) + ): + raise RuntimeError( + "The length of 'columns_order' doesn't match to 'known_dtypes' and 'cols_with_unknown_dtypes'" + ) + self._columns_order: Optional[dict[int, IndexLabel]] = columns_order + + def update_parent(self, new_parent: "PandasDataframe"): + """ + Set new parent dataframe. + + Parameters + ---------- + new_parent : PandasDataframe + """ + self._parent_df = new_parent + for key, value in self._known_dtypes.items(): + if isinstance(value, LazyProxyCategoricalDtype): + self._known_dtypes[key] = value._update_proxy( + new_parent, column_name=key + ) + # try to compute '._columns_order' using 'new_parent' + self.columns_order + + @property + def columns_order(self) -> Optional[dict[int, IndexLabel]]: + """ + Get order of columns for the described dataframe if available. + + Returns + ------- + dict[int, IndexLabel] or None + """ + if self._columns_order is not None: + return self._columns_order + if self._parent_df is None or not self._parent_df.has_materialized_columns: + return None + + self._columns_order = {i: col for i, col in enumerate(self._parent_df.columns)} + # we got information about new columns and thus can potentially + # extend our knowledge about missing dtypes + if len(self._columns_order) > ( + len(self._known_dtypes) + len(self._cols_with_unknown_dtypes) + ): + new_cols = [ + col + for col in self._columns_order.values() + if col not in self._known_dtypes + and col not in self._cols_with_unknown_dtypes + ] + if self._remaining_dtype is not None: + self._known_dtypes.update( + {col: self._remaining_dtype for col in new_cols} + ) + self._remaining_dtype = None + if len(self._cols_with_unknown_dtypes) == 0: + self._schema_is_known = True + else: + self._cols_with_unknown_dtypes.extend(new_cols) + self._know_all_names = True + return self._columns_order + + def __repr__(self): # noqa: GL08 + return ( + f"DtypesDescriptor:\n\tknown dtypes: {self._known_dtypes};\n\t" + + f"remaining dtype: {self._remaining_dtype};\n\t" + + f"cols with unknown dtypes: {self._cols_with_unknown_dtypes};\n\t" + + f"schema is known: {self._schema_is_known};\n\t" + + f"has parent df: {self._parent_df is not None};\n\t" + + f"columns order: {self._columns_order};\n\t" + + f"know all names: {self._know_all_names}" + ) + + def __str__(self): # noqa: GL08 + return self.__repr__() + + def lazy_get( + self, ids: list[Union[IndexLabel, int]], numeric_index: bool = False + ) -> "DtypesDescriptor": + """ + Get dtypes descriptor for a subset of columns without triggering any computations. + + Parameters + ---------- + ids : list of index labels or positional indexers + Columns for the subset. + numeric_index : bool, default: False + Whether `ids` are positional indixes or column labels to take. + + Returns + ------- + DtypesDescriptor + Descriptor that describes dtypes for columns specified in `ids`. + """ + if len(np.unique(ids)) != len(ids): + raise NotImplementedError( + "Duplicated column names are not yet supported by DtypesDescriptor" + ) + + if numeric_index: + if self.columns_order is not None: + ids = [self.columns_order[i] for i in ids] + else: + raise NotImplementedError( + "Can't lazily get columns by positional indixers if the columns order is unknown" + ) + + result = {} + unknown_cols = [] + columns_order = {} + for i, col in enumerate(ids): + columns_order[i] = col + if col in self._cols_with_unknown_dtypes: + unknown_cols.append(col) + continue + dtype = self._known_dtypes.get(col) + if dtype is None and self._remaining_dtype is None: + unknown_cols.append(col) + elif dtype is None and self._remaining_dtype is not None: + result[col] = self._remaining_dtype + else: + result[col] = dtype + remaining_dtype = self._remaining_dtype if len(unknown_cols) != 0 else None + return DtypesDescriptor( + result, + unknown_cols, + remaining_dtype, + self._parent_df, + columns_order=columns_order, + ) + + def copy(self) -> "DtypesDescriptor": + """ + Get a copy of this descriptor. + + Returns + ------- + DtypesDescriptor + """ + return type(self)( + self._known_dtypes.copy(), + self._cols_with_unknown_dtypes.copy(), + self._remaining_dtype, + self._parent_df, + columns_order=self.columns_order, + know_all_names=self._know_all_names, + _schema_is_known=self._schema_is_known, + ) + + def set_index( + self, new_index: Union[pandas.Index, "ModinIndex"] + ) -> "DtypesDescriptor": + """ + Set new column names for this descriptor. + + Parameters + ---------- + new_index : pandas.Index or ModinIndex + + Returns + ------- + DtypesDescriptor + New descriptor with updated column names. + + Notes + ----- + Calling this method on a descriptor that returns ``None`` for ``.columns_order`` + will result into information lose. + """ + if self.columns_order is None: + # we can't map new columns to old columns and lost all dtypes :( + return DtypesDescriptor( + cols_with_unknown_dtypes=new_index, + parent_df=self._parent_df, + know_all_names=True, + ) + + new_self = self.copy() + renamer = {old_c: new_index[i] for i, old_c in new_self.columns_order.items()} + new_self._known_dtypes = { + renamer[old_col]: value for old_col, value in new_self._known_dtypes.items() + } + new_self._cols_with_unknown_dtypes = [ + renamer[old_col] for old_col in new_self._cols_with_unknown_dtypes + ] + new_self._columns_order = { + i: renamer[old_col] for i, old_col in new_self._columns_order.items() + } + return new_self + + def equals(self, other: "DtypesDescriptor") -> bool: + """ + Compare two descriptors for equality. + + Parameters + ---------- + other : DtypesDescriptor + + Returns + ------- + bool + """ + return ( + self._known_dtypes == other._known_dtypes + and set(self._cols_with_unknown_dtypes) + == set(other._cols_with_unknown_dtypes) + and self._remaining_dtype == other._remaining_dtype + and self._schema_is_known == other._schema_is_known + and ( + self.columns_order == other.columns_order + or (self.columns_order is None and other.columns_order is None) + ) + and self._know_all_names == other._know_all_names + ) + + @property + def is_materialized(self) -> bool: + """ + Whether this descriptor contains information about all dtypes in the dataframe. + + Returns + ------- + bool + """ + return self._schema_is_known + + def _materialize_all_names(self): + """Materialize missing column names.""" + if self._know_all_names: + return + + all_cols = self._parent_df.columns + for col in all_cols: + if ( + col not in self._known_dtypes + and col not in self._cols_with_unknown_dtypes + ): + self._cols_with_unknown_dtypes.append(col) + + self._know_all_names = True + + def _materialize_cols_with_unknown_dtypes(self): + """Compute dtypes for cols specified in `._cols_with_unknown_dtypes`.""" + if ( + len(self._known_dtypes) == 0 + and len(self._cols_with_unknown_dtypes) == 0 + and not self._know_all_names + ): + # here we have to compute dtypes for all columns in the dataframe, + # so avoiding columns materialization by setting 'subset=None' + subset = None + else: + if not self._know_all_names: + self._materialize_all_names() + subset = self._cols_with_unknown_dtypes + + if subset is None or len(subset) > 0: + self._known_dtypes.update(self._parent_df._compute_dtypes(subset)) + + self._know_all_names = True + self._cols_with_unknown_dtypes = [] + + def materialize(self): + """Complete information about dtypes.""" + if self.is_materialized: + return + if self._parent_df is None: + raise RuntimeError( + "It's not allowed to call '.materialize()' before '._parent_df' is specified." + ) + + self._materialize_cols_with_unknown_dtypes() + + if self._remaining_dtype is not None: + cols = self._parent_df.columns + self._known_dtypes.update( + { + col: self._remaining_dtype + for col in cols + if col not in self._known_dtypes + } + ) + + # we currently not guarantee for dtypes to be in a proper order: + # https://github.com/modin-project/modin/blob/8a332c1597c54d36f7ccbbd544e186b689f9ceb1/modin/pandas/test/utils.py#L644-L646 + # so restoring the order only if it's possible + if self.columns_order is not None: + assert len(self.columns_order) == len(self._known_dtypes) + self._known_dtypes = { + self.columns_order[i]: self._known_dtypes[self.columns_order[i]] + for i in range(len(self.columns_order)) + } + + self._schema_is_known = True + self._remaining_dtype = None + self._parent_df = None + + def to_series(self) -> pandas.Series: + """ + Convert descriptor to a pandas Series. + + Returns + ------- + pandas.Series + """ + self.materialize() + return pandas.Series(self._known_dtypes) + + def get_dtypes_set(self) -> set[np.dtype]: + """ + Get a set of dtypes from the descriptor. + + Returns + ------- + set[np.dtype] + """ + if len(self._cols_with_unknown_dtypes) > 0 or not self._know_all_names: + self._materialize_cols_with_unknown_dtypes() + known_dtypes: set[np.dtype] = set(self._known_dtypes.values()) + if self._remaining_dtype is not None: + known_dtypes.add(self._remaining_dtype) + return known_dtypes + + @classmethod + def concat( + cls, values: list[Union["DtypesDescriptor", pandas.Series, None]] + ) -> "DtypesDescriptor": # noqa: GL08 + """ + Concatenate dtypes descriptors into a single descriptor. + + Parameters + ---------- + values : list of DtypesDescriptors and pandas.Series + + Returns + ------- + DtypesDescriptor + """ + known_dtypes = {} + cols_with_unknown_dtypes = [] + schema_is_known = True + # some default value to not mix it with 'None' + remaining_dtype = "default" + know_all_names = True + + for val in values: + if isinstance(val, cls): + all_new_cols = ( + list(val._known_dtypes.keys()) + val._cols_with_unknown_dtypes + ) + if any( + col in known_dtypes or col in cols_with_unknown_dtypes + for col in all_new_cols + ): + raise NotImplementedError( + "Duplicated column names are not yet supported by DtypesDescriptor" + ) + know_all_names &= val._know_all_names + known_dtypes.update(val._known_dtypes) + cols_with_unknown_dtypes.extend(val._cols_with_unknown_dtypes) + if know_all_names: + if ( + remaining_dtype == "default" + and val._remaining_dtype is not None + ): + remaining_dtype = val._remaining_dtype + elif ( + remaining_dtype != "default" + and val._remaining_dtype is not None + and remaining_dtype != val._remaining_dtype + ): + remaining_dtype = None + know_all_names = False + else: + remaining_dtype = None + schema_is_known &= val._schema_is_known + elif isinstance(val, pandas.Series): + if any( + col in known_dtypes or col in cols_with_unknown_dtypes + for col in val.index + ): + raise NotImplementedError( + "Duplicated column names are not yet supported by DtypesDescriptor" + ) + known_dtypes.update(val) + elif val is None: + remaining_dtype = None + schema_is_known = False + know_all_names = False + else: + raise NotImplementedError(type(val)) + return cls( + known_dtypes, + cols_with_unknown_dtypes, + None if remaining_dtype == "default" else remaining_dtype, + parent_df=None, + _schema_is_known=schema_is_known, + know_all_names=know_all_names, + ) + + class ModinDtypes: """ A class that hides the various implementations of the dtypes needed for optimization. @@ -28,10 +536,19 @@ class ModinDtypes: value : pandas.Series or callable """ - def __init__(self, value): - if value is None: + def __init__(self, value: Union[Callable, pandas.Series, DtypesDescriptor]): + if callable(value) or isinstance(value, pandas.Series): + self._value = value + elif isinstance(value, DtypesDescriptor): + self._value = value.to_series() if value.is_materialized else value + else: raise ValueError(f"ModinDtypes doesn't work with '{value}'") - self._value = value + + def __repr__(self): # noqa: GL08 + return f"ModinDtypes:\n\tvalue type: {type(self._value)};\n\tvalue:\n\t{self._value}" + + def __str__(self): # noqa: GL08 + return self.__repr__() @property def is_materialized(self) -> bool: @@ -44,6 +561,131 @@ def is_materialized(self) -> bool: """ return isinstance(self._value, pandas.Series) + def get_dtypes_set(self) -> set[np.dtype]: + """ + Get a set of dtypes from the descriptor. + + Returns + ------- + set[np.dtype] + """ + if isinstance(self._value, DtypesDescriptor): + return self._value.get_dtypes_set() + if not self.is_materialized: + self.get() + return set(self._value.values) + + def maybe_specify_new_frame_ref( + self, new_parent: "PandasDataframe" + ) -> "ModinDtypes": + """ + Set a new parent for the stored value if needed. + + Parameters + ---------- + new_parent : PandasDataframe + + Returns + ------- + ModinDtypes + A copy of ``ModinDtypes`` with updated parent. + """ + new_self = self.copy() + if new_self.is_materialized: + for key, value in new_self._value.items(): + if isinstance(value, LazyProxyCategoricalDtype): + new_self._value[key] = value._update_proxy( + new_parent, column_name=key + ) + return new_self + if isinstance(self._value, DtypesDescriptor): + new_self._value.update_parent(new_parent) + return new_self + return new_self + + def lazy_get(self, ids: list, numeric_index: bool = False) -> "ModinDtypes": + """ + Get new ``ModinDtypes`` for a subset of columns without triggering any computations. + + Parameters + ---------- + ids : list of index labels or positional indexers + Columns for the subset. + numeric_index : bool, default: False + Whether `ids` are positional indixes or column labels to take. + + Returns + ------- + ModinDtypes + ``ModinDtypes`` that describes dtypes for columns specified in `ids`. + """ + if isinstance(self._value, DtypesDescriptor): + res = self._value.lazy_get(ids, numeric_index) + return ModinDtypes(res) + elif callable(self._value): + new_self = self.copy() + old_value = new_self._value + new_self._value = ( + lambda: old_value().iloc[ids] if numeric_index else old_value()[ids] + ) + return new_self + ErrorMessage.catch_bugs_and_request_email( + failure_condition=not self.is_materialized + ) + return ModinDtypes(self._value.iloc[ids] if numeric_index else self._value[ids]) + + @classmethod + def concat(cls, values: list) -> "ModinDtypes": + """ + Concatenate dtypes.. + + Parameters + ---------- + values : list of DtypesDescriptors, pandas.Series, ModinDtypes and Nones + + Returns + ------- + ModinDtypes + """ + preprocessed_vals = [] + for val in values: + if isinstance(val, cls): + val = val._value + if isinstance(val, (DtypesDescriptor, pandas.Series)) or val is None: + preprocessed_vals.append(val) + else: + raise NotImplementedError(type(val)) + + desc = DtypesDescriptor.concat(preprocessed_vals) + return ModinDtypes(desc) + + def set_index(self, new_index: Union[pandas.Index, "ModinIndex"]) -> "ModinDtypes": + """ + Set new column names for stored dtypes. + + Parameters + ---------- + new_index : pandas.Index or ModinIndex + + Returns + ------- + ModinDtypes + New ``ModinDtypes`` with updated column names. + """ + new_self = self.copy() + if self.is_materialized: + new_self._value.index = new_index + return new_self + elif callable(self._value): + old_val = new_self._value + new_self._value = lambda: old_val().set_axis(new_index) + return new_self + elif isinstance(new_self._value, DtypesDescriptor): + new_self._value = new_self._value.set_index(new_index) + return new_self + else: + raise NotImplementedError() + def get(self) -> pandas.Series: """ Get the materialized internal representation. @@ -57,6 +699,8 @@ def get(self) -> pandas.Series: self._value = self._value() if self._value is None: self._value = pandas.Series([]) + elif isinstance(self._value, DtypesDescriptor): + self._value = self._value.to_series() else: raise NotImplementedError(type(self._value)) return self._value diff --git a/modin/core/dataframe/pandas/metadata/index.py b/modin/core/dataframe/pandas/metadata/index.py index 0d4ec3ed28c..6242b61e2f8 100644 --- a/modin/core/dataframe/pandas/metadata/index.py +++ b/modin/core/dataframe/pandas/metadata/index.py @@ -249,6 +249,22 @@ def __reduce__(self): }, ) + def __getitem__(self, key): + """ + Get an index value at the position of `key`. + + Parameters + ---------- + key : int + + Returns + ------- + label + """ + if not self.is_materialized: + self.get() + return self._value[key] + def __getattr__(self, name): """ Redirect access to non-existent attributes to the internal representation. diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index ada3f3db3d3..f2603b2ccb2 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -4526,6 +4526,16 @@ def set_index_names(self, names, axis=0): """ self.get_axis(axis).names = names + def get_dtypes_set(self): + """ + Get a set of dtypes that are in this query compiler. + + Returns + ------- + set + """ + return set(self.dtypes.value) + # DateTime methods def between_time(self, **kwargs): # noqa: PR01 """ diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 99dcb2f77e4..f6bb4b45a73 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -313,6 +313,9 @@ def from_dataframe(cls, df, data_cls): def dtypes(self): return self._modin_frame.dtypes + def get_dtypes_set(self): + return self._modin_frame.get_dtypes_set() + # END Index, columns, and dtypes objects # Metadata modification methods @@ -4300,13 +4303,13 @@ def map_fn(df): # pragma: no cover # than it would be to reuse the code for specific columns. if len(columns) == len(self.columns): new_modin_frame = self._modin_frame.apply_full_axis( - 0, map_fn, new_index=self.index + 0, map_fn, new_index=self.index, dtypes=bool ) untouched_frame = None else: new_modin_frame = self._modin_frame.take_2d_labels_or_positional( col_labels=columns - ).apply_full_axis(0, map_fn, new_index=self.index) + ).apply_full_axis(0, map_fn, new_index=self.index, dtypes=bool) untouched_frame = self.drop(columns=columns) # If we mapped over all the data we are done. If not, we need to # prepend the `new_modin_frame` with the raw data from the columns that were diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py index faeb24e8c48..6c60ae0ce62 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py @@ -505,12 +505,12 @@ def _dtypes_for_exprs(self, exprs): @_inherit_docstrings(PandasDataframe._maybe_update_proxies) def _maybe_update_proxies(self, dtypes, new_parent=None): if new_parent is not None: - super()._maybe_update_proxies(dtypes, new_parent) + return super()._maybe_update_proxies(dtypes, new_parent) if self._partitions is None: - return + return dtypes table = self._partitions[0][0].get() if isinstance(table, pyarrow.Table): - super()._maybe_update_proxies(dtypes, new_parent=table) + return super()._maybe_update_proxies(dtypes, new_parent=table) def groupby_agg(self, by, axis, agg, groupby_args, **kwargs): """ diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index a98199ee195..9ec920e1ea0 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -2875,8 +2875,9 @@ def _validate_dtypes(self, numeric_only=False): # Series.__getitem__ treating keys as positions is deprecated. In a future version, # integer keys will always be treated as labels (consistent with DataFrame behavior). # To access a value by position, use `ser.iloc[pos]` - dtype = self.dtypes.iloc[0] - for t in self.dtypes: + dtypes = self._query_compiler.get_dtypes_set() + dtype = next(iter(dtypes)) + for t in dtypes: if numeric_only and not is_numeric_dtype(t): raise TypeError("{0} is not a numeric data type".format(t)) elif not numeric_only and t != dtype: diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index 71310f3ee5d..67dafe87377 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -22,6 +22,10 @@ import modin.pandas as pd from modin.config import Engine, ExperimentalGroupbyImpl, MinPartitionSize, NPartitions from modin.core.dataframe.pandas.dataframe.utils import ColumnInfo, ShuffleSortFunctions +from modin.core.dataframe.pandas.metadata import ( + DtypesDescriptor, + LazyProxyCategoricalDtype, +) from modin.core.storage_formats.pandas.utils import split_result_of_axis_func_pandas from modin.distributed.dataframe.pandas import from_partitions from modin.pandas.test.utils import create_test_dfs, df_equals, test_data_values @@ -1011,7 +1015,7 @@ def test_merge_preserves_metadata(has_cols_metadata, has_dtypes_metadata): if has_dtypes_metadata: # Verify that there were initially materialized metadata - assert modin_frame.has_dtypes_cache + assert modin_frame.has_materialized_dtypes else: modin_frame.set_dtypes_cache(None) @@ -1020,18 +1024,18 @@ def test_merge_preserves_metadata(has_cols_metadata, has_dtypes_metadata): if has_cols_metadata: assert res.has_materialized_columns if has_dtypes_metadata: - assert res.has_dtypes_cache + assert res.has_materialized_dtypes else: # Verify that no materialization was triggered - assert not res.has_dtypes_cache - assert not modin_frame.has_dtypes_cache + assert not res.has_materialized_dtypes + assert not modin_frame.has_materialized_dtypes else: # Verify that no materialization was triggered assert not res.has_materialized_columns - assert not res.has_dtypes_cache + assert not res.has_materialized_dtypes assert not modin_frame.has_materialized_columns if not has_dtypes_metadata: - assert not modin_frame.has_dtypes_cache + assert not modin_frame.has_materialized_dtypes def test_binary_op_preserve_dtypes(): @@ -1511,3 +1515,306 @@ def assert_materialized(obj): assert call_queue == reconstructed_queue assert_everything_materialized(reconstructed_queue) + + +class TestDtypesDescriptor: + schema = pandas.Series( + { + "a": np.dtype(int), + "b": np.dtype(float), + "c": np.dtype(bool), + "d": np.dtype(bool), + "e": np.dtype("object"), + } + ) + + def get_columns_order(self, cols): + """Return a value to be passed as ``DtypesDescriptor(columns_order=...)`` parameter.""" + return {i: col for i, col in enumerate(cols)} + + class DummyDf: + def __init__(self, schema): + self._schema = pandas.Series(schema) + # record calls to verify that we haven't materialized more than needed + self.history = [] + + def _compute_dtypes(self, subset=None): + self.history.append(("_compute_dtypes", subset)) + return self._schema if subset is None else self._schema[subset] + + @property + def columns(self): + self.history.append(("columns",)) + return self._schema.index + + @property + def has_materialized_columns(self): + # False, to make descriptor avoid materialization at all cost + return False + + def test_get_dtypes_set(self): + """ + Test that ``DtypesDescriptor.get_dtypes_set()`` returns valid values and doesn't + trigger unnecessary computations. + """ + df = self.DummyDf(self.schema) + desc = DtypesDescriptor( + self.schema[["a", "b"]], know_all_names=False, parent_df=df + ) + res = desc.get_dtypes_set() + exp = self.schema.values + assert res == set(exp) + # since 'know_all_names=False', we first have to retrieve columns + # in order to determine missing dtypes and then call '._compute_dtypes()' + # only on a subset + assert len(df.history) == 2 and df.history == [ + ("columns",), + ("_compute_dtypes", ["c", "d", "e"]), + ] + + df = self.DummyDf(self.schema) + desc = DtypesDescriptor( + self.schema[["a", "b"]], + cols_with_unknown_dtypes=["c", "d", "e"], + parent_df=df, + ) + res = desc.get_dtypes_set() + exp = self.schema.values + assert res == set(exp) + # here we already know names for cols with unknown dtypes, so only + # calling '._compute_dtypes()' on a subset + assert len(df.history) == 1 and df.history[0] == ( + "_compute_dtypes", + ["c", "d", "e"], + ) + + df = self.DummyDf(self.schema[["a", "b", "c", "d"]]) + desc = DtypesDescriptor( + self.schema[["a", "b"]], remaining_dtype=np.dtype(bool), parent_df=df + ) + res = desc.get_dtypes_set() + exp = self.schema[["a", "b", "c", "d"]].values + assert res == set(exp) + # we don't need to access 'parent_df' in order to get dtypes set, as we + # can infer it from 'known_dtypes' and 'remaining_dtype' + assert len(df.history) == 0 + + df = self.DummyDf(self.schema) + desc = DtypesDescriptor(know_all_names=False, parent_df=df) + res = desc.get_dtypes_set() + exp = self.schema.values + assert res == set(exp) + # compute dtypes for all columns + assert len(df.history) == 1 and df.history[0] == ("_compute_dtypes", None) + + df = self.DummyDf(self.schema) + desc = DtypesDescriptor( + cols_with_unknown_dtypes=self.schema.index.tolist(), parent_df=df + ) + res = desc.get_dtypes_set() + exp = self.schema.values + assert res == set(exp) + # compute dtypes for all columns + assert len(df.history) == 1 and df.history[0] == ( + "_compute_dtypes", + self.schema.index.tolist(), + ) + + df = self.DummyDf(self.schema) + desc = DtypesDescriptor( + cols_with_unknown_dtypes=["a", "b", "e"], + remaining_dtype=np.dtype(bool), + parent_df=df, + ) + res = desc.get_dtypes_set() + exp = self.schema.values + assert res == set(exp) + # here we already know names for cols with unknown dtypes, so only + # calling '._compute_dtypes()' on a subset + assert len(df.history) == 1 and df.history[0] == ( + "_compute_dtypes", + ["a", "b", "e"], + ) + + def test_lazy_get(self): + """ + Test that ``DtypesDescriptor.lazy_get()`` work properly. + + In this test we never specify `parent_df` for a descriptor, verifying that + ``.lazy_get()`` never triggers any computations. + """ + desc = DtypesDescriptor(self.schema[["a", "b"]]) + subset = ["a", "c", "e"] + res = desc.lazy_get(subset) + exp = DtypesDescriptor( + self.schema[subset[:1]], + cols_with_unknown_dtypes=subset[1:], + columns_order=self.get_columns_order(subset), + ) + assert res.equals(exp) + + desc = DtypesDescriptor(self.schema[["a", "b"]], remaining_dtype=np.dtype(bool)) + subset = ["a", "c", "d"] + res = desc.lazy_get(subset) + exp = DtypesDescriptor( + # dtypes for 'c' and 'b' were infered from 'remaining_dtype' parameter + self.schema[subset], + columns_order=self.get_columns_order(subset), + _schema_is_known=True, + ) + assert res.equals(exp) + + desc = DtypesDescriptor() + subset = ["a", "c", "d"] + res = desc.lazy_get(subset) + exp = DtypesDescriptor( + cols_with_unknown_dtypes=subset, + columns_order=self.get_columns_order(subset), + ) + assert res.equals(exp) + + desc = DtypesDescriptor(remaining_dtype=np.dtype(bool)) + subset = ["c", "d"] + res = desc.lazy_get(subset) + exp = DtypesDescriptor( + # dtypes for 'c' and 'd' were infered from 'remaining_dtype' parameter + self.schema[subset], + columns_order=self.get_columns_order(subset), + _schema_is_known=True, + ) + assert res.equals(exp) + + def test_concat(self): + res = DtypesDescriptor.concat( + [ + DtypesDescriptor(self.schema[["a", "b"]]), + DtypesDescriptor(self.schema[["c", "d"]]), + ] + ) + # simply concat known schemas + exp = DtypesDescriptor(self.schema[["a", "b", "c", "d"]]) + assert res.equals(exp) + + res = DtypesDescriptor.concat( + [ + DtypesDescriptor(self.schema[["a", "b"]]), + DtypesDescriptor(remaining_dtype=np.dtype(bool)), + ] + ) + # none of the descriptors had missing column names, so we can preserve 'remaining_dtype' + exp = DtypesDescriptor(self.schema[["a", "b"]], remaining_dtype=np.dtype(bool)) + assert res.equals(exp) + + res = DtypesDescriptor.concat( + [ + DtypesDescriptor(self.schema[["a", "b"]], know_all_names=False), + DtypesDescriptor(remaining_dtype=np.dtype(bool)), + ] + ) + # can't preserve 'remaining_dtype' since first descriptor has unknown column names + exp = DtypesDescriptor(self.schema[["a", "b"]], know_all_names=False) + assert res.equals(exp) + + res = DtypesDescriptor.concat( + [ + DtypesDescriptor(self.schema[["a", "b"]]), + DtypesDescriptor( + cols_with_unknown_dtypes=["d", "e"], know_all_names=False + ), + DtypesDescriptor(remaining_dtype=np.dtype(bool)), + ] + ) + # can't preserve 'remaining_dtype' since second descriptor has unknown column names + exp = DtypesDescriptor( + self.schema[["a", "b"]], + cols_with_unknown_dtypes=["d", "e"], + know_all_names=False, + ) + assert res.equals(exp) + + res = DtypesDescriptor.concat( + [ + DtypesDescriptor( + self.schema[["a", "b"]], + ), + DtypesDescriptor( + cols_with_unknown_dtypes=["d", "e"], + ), + DtypesDescriptor(remaining_dtype=np.dtype(bool)), + ] + ) + # none of the descriptors had missing column names, so we can preserve 'remaining_dtype' + exp = DtypesDescriptor( + self.schema[["a", "b"]], + cols_with_unknown_dtypes=["d", "e"], + remaining_dtype=np.dtype(bool), + ) + assert res.equals(exp) + + res = DtypesDescriptor.concat( + [ + DtypesDescriptor( + self.schema[["a", "b"]], remaining_dtype=np.dtype(bool) + ), + DtypesDescriptor( + cols_with_unknown_dtypes=["d", "e"], remaining_dtype=np.dtype(float) + ), + DtypesDescriptor(remaining_dtype=np.dtype(bool)), + ] + ) + # remaining dtypes don't match, so we drop them and set 'know_all_names=False' + exp = DtypesDescriptor( + self.schema[["a", "b"]], + cols_with_unknown_dtypes=["d", "e"], + know_all_names=False, + ) + assert res.equals(exp) + + def test_update_parent(self): + """ + Test that updating parents in ``DtypesDescriptor`` also propagates to stored lazy categoricals. + """ + # 'df1' will have a materialized 'pandas.Series' as dtypes cache + df1 = pd.DataFrame({"a": [1, 1, 2], "b": [3, 4, 5]}).astype({"a": "category"}) + assert isinstance(df1.dtypes["a"], LazyProxyCategoricalDtype) + + # 'df2' will have a 'DtypesDescriptor' with unknown dtypes for a column 'c' + df2 = pd.DataFrame({"c": [2, 3, 4]}) + df2._query_compiler._modin_frame.set_dtypes_cache(None) + dtypes_cache = df2._query_compiler._modin_frame._dtypes + assert isinstance( + dtypes_cache._value, DtypesDescriptor + ) and dtypes_cache._value._cols_with_unknown_dtypes == ["c"] + + # concatenating 'df1' and 'df2' to get a 'DtypesDescriptor' storing lazy categories + # in its 'known_dtypes' field + res = pd.concat([df1, df2], axis=1) + old_parent = df1._query_compiler._modin_frame + new_parent = res._query_compiler._modin_frame + dtypes_cache = new_parent._dtypes._value + + # verifying that the reference for lazy categories to a new parent was updated + assert dtypes_cache._parent_df is new_parent + assert dtypes_cache._known_dtypes["a"]._parent is new_parent + assert old_parent._dtypes["a"]._parent is old_parent + + +class TestZeroComputationDtypes: + """ + Test cases that shouldn't trigger dtypes computation during their execution. + """ + + def test_get_dummies_case(self): + from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe + + with mock.patch.object(PandasDataframe, "_compute_dtypes") as patch: + df = pd.DataFrame( + {"items": [1, 2, 3, 4], "b": [3, 3, 4, 4], "c": [1, 0, 0, 1]} + ) + res = pd.get_dummies(df, columns=["b", "c"]) + cols = [col for col in res.columns if col != "items"] + res[cols] = res[cols] / res[cols].mean() + + assert res._query_compiler._modin_frame.has_materialized_dtypes + + patch.assert_not_called() diff --git a/setup.cfg b/setup.cfg index 8f479523bec..689f3ba32ad 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,7 +12,7 @@ tag_prefix = parentdir_prefix = modin- [tool:pytest] -addopts = --cov-config=setup.cfg --cov=modin --cov-append --cov-report= +addopts = xfail_strict=true markers = xfail_executions