Skip to content

Commit

Permalink
FEAT-#5836: Introduce 'partial' dtypes cache (#6663)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev authored Nov 17, 2023
1 parent 1b36f4c commit b7bf9b5
Show file tree
Hide file tree
Showing 9 changed files with 1,230 additions and 46 deletions.
108 changes: 81 additions & 27 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
lazy_metadata_decorator,
)
from modin.core.dataframe.pandas.metadata import (
DtypesDescriptor,
LazyProxyCategoricalDtype,
ModinDtypes,
ModinIndex,
Expand Down Expand Up @@ -314,25 +315,39 @@ def _maybe_update_proxies(self, dtypes, new_parent=None):
new_parent : object, optional
A new parent to link the proxies to. If not specified
will consider the `self` to be a new parent.
Returns
-------
pandas.Series, ModinDtypes or callable
"""
new_parent = new_parent or self
if isinstance(dtypes, pandas.Series) or (
isinstance(dtypes, ModinDtypes) and dtypes.is_materialized
):
for key, value in dtypes.items():
if isinstance(value, LazyProxyCategoricalDtype):
dtypes[key] = value._update_proxy(new_parent, column_name=key)
if isinstance(dtypes, ModinDtypes):
dtypes = dtypes.maybe_specify_new_frame_ref(new_parent)
if isinstance(dtypes, pandas.Series):
LazyProxyCategoricalDtype.update_dtypes(dtypes, new_parent)
return dtypes

def set_dtypes_cache(self, dtypes):
"""
Set dtypes cache.
Parameters
----------
dtypes : pandas.Series, ModinDtypes or callable
"""
self._maybe_update_proxies(dtypes)
if isinstance(dtypes, ModinDtypes) or dtypes is None:
dtypes : pandas.Series, ModinDtypes, callable or None
"""
dtypes = self._maybe_update_proxies(dtypes)
if dtypes is None and self.has_materialized_columns:
# try to set a descriptor instead of 'None' to be more flexible in
# dtypes computing
try:
self._dtypes = ModinDtypes(
DtypesDescriptor(
cols_with_unknown_dtypes=self.columns.tolist(), parent_df=self
)
)
except NotImplementedError:
self._dtypes = None
elif isinstance(dtypes, ModinDtypes) or dtypes is None:
self._dtypes = dtypes
else:
self._dtypes = ModinDtypes(dtypes)
Expand All @@ -354,6 +369,18 @@ def dtypes(self):
self.set_dtypes_cache(dtypes)
return dtypes

def get_dtypes_set(self):
"""
Get a set of dtypes that are in this dataframe.
Returns
-------
set
"""
if isinstance(self._dtypes, ModinDtypes):
return self._dtypes.get_dtypes_set()
return set(self.dtypes.values)

def _compute_dtypes(self, columns=None):
"""
Compute the data types via TreeReduce pattern for the specified columns.
Expand All @@ -376,7 +403,13 @@ def dtype_builder(df):
if columns is not None:
# Sorting positions to request columns in the order they're stored (it's more efficient)
numeric_indices = sorted(self.columns.get_indexer_for(columns))
obj = self._take_2d_positional(col_positions=numeric_indices)
if any(pos < 0 for pos in numeric_indices):
raise KeyError(
f"Some of the columns are not in index: subset={columns}; columns={self.columns}"
)
obj = self.take_2d_labels_or_positional(
col_labels=self.columns[numeric_indices].tolist()
)
else:
obj = self

Expand Down Expand Up @@ -675,8 +708,11 @@ def _set_columns(self, new_columns):
):
return
new_columns = self._validate_set_axis(new_columns, self._columns_cache)
if self.has_materialized_dtypes:
self.dtypes.index = new_columns
if isinstance(self._dtypes, ModinDtypes):
new_value = self._dtypes.set_index(new_columns)
self.set_dtypes_cache(new_value)
elif isinstance(self._dtypes, pandas.Series):
self.dtypes.index = new_columns
self.set_columns_cache(new_columns)
self.synchronize_labels(axis=1)

Expand Down Expand Up @@ -1146,6 +1182,14 @@ def _take_2d_positional(

if self.has_materialized_dtypes:
new_dtypes = self.dtypes.iloc[monotonic_col_idx]
elif isinstance(self._dtypes, ModinDtypes):
try:
new_dtypes = self._dtypes.lazy_get(
monotonic_col_idx, numeric_index=True
)
# can raise either on missing cache or on duplicated labels
except (ValueError, NotImplementedError):
new_dtypes = None
else:
new_dtypes = None
else:
Expand Down Expand Up @@ -1441,6 +1485,12 @@ def _reorder_labels(self, row_positions=None, col_positions=None):
col_idx = self.columns[col_positions]
if self.has_materialized_dtypes:
new_dtypes = self.dtypes.iloc[col_positions]
elif isinstance(self._dtypes, ModinDtypes):
try:
new_dtypes = self._dtypes.lazy_get(col_idx)
# can raise on duplicated labels
except NotImplementedError:
new_dtypes = None

if len(col_idx) != len(self.columns):
# The frame was re-partitioned along the 1 axis during reordering using
Expand Down Expand Up @@ -3253,22 +3303,24 @@ def broadcast_apply_full_axis(
kw = {"row_lengths": None, "column_widths": None}
if isinstance(dtypes, str) and dtypes == "copy":
kw["dtypes"] = self.copy_dtypes_cache()
elif isinstance(dtypes, DtypesDescriptor):
kw["dtypes"] = ModinDtypes(dtypes)
elif dtypes is not None:
if isinstance(dtypes, (pandas.Series, ModinDtypes)):
kw["dtypes"] = dtypes.copy()
else:
if new_columns is None:
(
new_columns,
kw["column_widths"],
) = self._compute_axis_labels_and_lengths(1, new_partitions)
kw["dtypes"] = (
pandas.Series(dtypes, index=new_columns)
if is_list_like(dtypes)
else pandas.Series(
[np.dtype(dtypes)] * len(new_columns), index=new_columns
kw["dtypes"] = ModinDtypes(
DtypesDescriptor(remaining_dtype=np.dtype(dtypes))
)
else:
kw["dtypes"] = (
pandas.Series(dtypes, index=new_columns)
if is_list_like(dtypes)
else pandas.Series(
[np.dtype(dtypes)] * len(new_columns), index=new_columns
)
)
)

if not keep_partitioning:
if kw["row_lengths"] is None and new_index is not None:
Expand Down Expand Up @@ -3662,10 +3714,12 @@ def _compute_new_widths():
if all(obj.has_materialized_columns for obj in (self, *others)):
new_columns = self.columns.append([other.columns for other in others])
new_index = joined_index
if self.has_materialized_dtypes and all(
o.has_materialized_dtypes for o in others
):
new_dtypes = pandas.concat([self.dtypes] + [o.dtypes for o in others])
try:
new_dtypes = ModinDtypes.concat(
[self.copy_dtypes_cache()] + [o.copy_dtypes_cache() for o in others]
)
except NotImplementedError:
new_dtypes = None
# If we have already cached the width of each column in at least one
# of the column's partitions, we can build new_widths for the new
# frame. Typically, if we know the width for any partition in a
Expand Down
4 changes: 2 additions & 2 deletions modin/core/dataframe/pandas/metadata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

"""Utilities and classes to handle work with metadata."""

from .dtypes import LazyProxyCategoricalDtype, ModinDtypes
from .dtypes import DtypesDescriptor, LazyProxyCategoricalDtype, ModinDtypes
from .index import ModinIndex

__all__ = ["ModinDtypes", "ModinIndex", "LazyProxyCategoricalDtype"]
__all__ = ["ModinDtypes", "ModinIndex", "LazyProxyCategoricalDtype", "DtypesDescriptor"]
Loading

0 comments on commit b7bf9b5

Please sign in to comment.