Skip to content

Commit

Permalink
PERF-#6754: Merge partial dtype caches on '.concat(axis=0)' (#6759)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev authored Nov 21, 2023
1 parent ed2e67b commit 794ac6f
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 19 deletions.
11 changes: 1 addition & 10 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3695,16 +3695,7 @@ def _compute_new_widths():
new_index = self.index.append([other.index for other in others])
new_columns = joined_index
frames = [self] + others
if all(frame.has_materialized_dtypes for frame in frames):
all_dtypes = [frame.dtypes for frame in frames]
if not all(dtypes.empty for dtypes in all_dtypes):
new_dtypes = pandas.concat(all_dtypes, axis=1)
# 'nan' value will be placed in a row if a column doesn't exist in all frames;
# this value is np.float64 type so we need an explicit conversion
new_dtypes.fillna(np.dtype("float64"), inplace=True)
new_dtypes = new_dtypes.apply(
lambda row: find_common_type(row.values), axis=1
)
new_dtypes = ModinDtypes.concat([frame._dtypes for frame in frames], axis=1)
# If we have already cached the length of each row in at least one
# of the row's partitions, we can build new_lengths for the new
# frame. Typically, if we know the length for any partition in a
Expand Down
128 changes: 120 additions & 8 deletions modin/core/dataframe/pandas/metadata/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import numpy as np
import pandas
from pandas._typing import IndexLabel
from pandas.core.dtypes.cast import find_common_type

if TYPE_CHECKING:
from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe
Expand Down Expand Up @@ -446,20 +447,120 @@ def get_dtypes_set(self) -> set[np.dtype]:
return known_dtypes

@classmethod
def concat(
def _merge_dtypes(
cls, values: list[Union["DtypesDescriptor", pandas.Series, None]]
) -> "DtypesDescriptor": # noqa: GL08
) -> "DtypesDescriptor":
"""
Union columns described by ``values`` and compute common dtypes for them.
Parameters
----------
values : list of DtypesDescriptors, pandas.Series or Nones
Returns
-------
DtypesDescriptor
"""
known_dtypes = {}
cols_with_unknown_dtypes = []
know_all_names = True
dtypes_are_unknown = False

# index - joined column names, columns - dtypes taken from 'values'
# 0 1 2 3
# col1 int bool float int
# col2 int int int int
# colN bool bool bool int
dtypes_matrix = pandas.DataFrame()

for i, val in enumerate(values):
if isinstance(val, cls):
know_all_names &= val._know_all_names
dtypes = val._known_dtypes.copy()
dtypes.update({col: "unknown" for col in val._cols_with_unknown_dtypes})
if val._remaining_dtype is not None:
# we can't process remaining dtypes, so just discarding them
know_all_names = False

# setting a custom name to the Series to prevent duplicated names
# in the 'dtypes_matrix'
series = pandas.Series(dtypes, name=i)
dtypes_matrix = pandas.concat([dtypes_matrix, series], axis=1)
dtypes_matrix.fillna(
value={
# If we encountered a 'NaN' while 'val' describes all the columns, then
# it means, that the missing columns for this instance will be filled with NaNs (floats),
# otherwise, it may indicate missing columns that this 'val' has no info about,
# meaning that we shouldn't try computing a new dtype for this column,
# so marking it as 'unknown'
i: np.dtype(float)
if val._know_all_names and val._remaining_dtype is None
else "unknown"
},
inplace=True,
)
elif isinstance(val, pandas.Series):
dtypes_matrix = pandas.concat([dtypes_matrix, val], axis=1)
elif val is None:
# one of the 'dtypes' is None, meaning that we wouldn't been infer a valid result dtype,
# however, we're continuing our loop so we would at least know the columns we're missing
# dtypes for
dtypes_are_unknown = True
know_all_names = False
else:
raise NotImplementedError(type(val))

if dtypes_are_unknown:
return DtypesDescriptor(
cols_with_unknown_dtypes=dtypes_matrix.index,
know_all_names=know_all_names,
)

def combine_dtypes(row):
if (row == "unknown").any():
return "unknown"
row = row.fillna(np.dtype("float"))
return find_common_type(list(row.values))

dtypes = dtypes_matrix.apply(combine_dtypes, axis=1)

for col, dtype in dtypes.items():
if dtype == "unknown":
cols_with_unknown_dtypes.append(col)
else:
known_dtypes[col] = dtype

return DtypesDescriptor(
known_dtypes,
cols_with_unknown_dtypes,
remaining_dtype=None,
know_all_names=know_all_names,
)

@classmethod
def concat(
cls, values: list[Union["DtypesDescriptor", pandas.Series, None]], axis: int = 0
) -> "DtypesDescriptor":
"""
Concatenate dtypes descriptors into a single descriptor.
Parameters
----------
values : list of DtypesDescriptors and pandas.Series
axis : int, default: 0
If ``axis == 0``: concatenate column names. This implements the logic of
how dtypes are combined on ``pd.concat([df1, df2], axis=1)``.
If ``axis == 1``: perform a union join for the column names described by
`values` and then find common dtypes for the columns appeared to be in
an intersection. This implements the logic of how dtypes are combined on
``pd.concat([df1, df2], axis=0).dtypes``.
Returns
-------
DtypesDescriptor
"""
if axis == 1:
return cls._merge_dtypes(values)
known_dtypes = {}
cols_with_unknown_dtypes = []
schema_is_known = True
Expand Down Expand Up @@ -636,13 +737,20 @@ def lazy_get(self, ids: list, numeric_index: bool = False) -> "ModinDtypes":
return ModinDtypes(self._value.iloc[ids] if numeric_index else self._value[ids])

@classmethod
def concat(cls, values: list) -> "ModinDtypes":
def concat(cls, values: list, axis: int = 0) -> "ModinDtypes":
"""
Concatenate dtypes..
Concatenate dtypes.
Parameters
----------
values : list of DtypesDescriptors, pandas.Series, ModinDtypes and Nones
axis : int, default: 0
If ``axis == 0``: concatenate column names. This implements the logic of
how dtypes are combined on ``pd.concat([df1, df2], axis=1)``.
If ``axis == 1``: perform a union join for the column names described by
`values` and then find common dtypes for the columns appeared to be in
an intersection. This implements the logic of how dtypes are combined on
``pd.concat([df1, df2], axis=0).dtypes``.
Returns
-------
Expand All @@ -658,12 +766,16 @@ def concat(cls, values: list) -> "ModinDtypes":
raise NotImplementedError(type(val))

try:
desc = DtypesDescriptor.concat(preprocessed_vals)
desc = DtypesDescriptor.concat(preprocessed_vals, axis=axis)
except NotImplementedError as e:
# 'DtypesDescriptor' doesn't support duplicated labels, however, if all values are pandas Serieses,
# 'DtypesDescriptor' doesn't support duplicated labels, however, if all values are pandas Series,
# we still can perform concatenation using pure pandas
if "duplicated" not in e.args[0].lower() or not all(
isinstance(val, pandas.Series) for val in values
if (
# 'pd.concat(axis=1)' fails on duplicated labels anyway, so doing this logic
# only in case 'axis=0'
axis == 0
and "duplicated" not in e.args[0].lower()
or not all(isinstance(val, pandas.Series) for val in values)
):
raise e
desc = pandas.concat(values)
Expand Down
143 changes: 142 additions & 1 deletion modin/test/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -1729,7 +1729,7 @@ def test_lazy_get_desc(self):
)
assert res.equals(exp)

def test_concat(self):
def test_concat_axis_0(self):
res = DtypesDescriptor.concat(
[
DtypesDescriptor(self.schema[["a", "b"]]),
Expand Down Expand Up @@ -1815,6 +1815,147 @@ def test_concat(self):
)
assert res.equals(exp)

@pytest.mark.parametrize(
"initial_dtypes, result_cols_with_known_dtypes, result_cols_with_unknown_dtypes",
[
[
# initial dtypes (cols_with_known_dtypes, cols_with_unknown_dtypes, remaining_dtype):
# dtypes for all columns are known
[
(["a", "b", "c", "d"], [], None),
(["a", "b", "e", "d"], [], None),
(["a", "b"], [], None),
],
# result_cols_with_known_dtypes:
# all dtypes were known in the beginning, expecting the same
# for the result
["a", "b", "c", "d", "e"],
# result_cols_with_unknown_dtypes
[],
],
[
# initial dtypes (cols_with_known_dtypes, cols_with_unknown_dtypes, remaining_dtype)
[
(["a", "b"], ["c", "d"], None),
(["a", "b", "d"], ["e"], None),
(["a", "b"], [], None),
],
# result_cols_with_known_dtypes:
# across all dataframes, dtypes were only known for 'a' and 'b' columns
["a", "b"],
# result_cols_with_unknown_dtypes
["c", "d", "e"],
],
[
# initial dtypes (cols_with_known_dtypes, cols_with_unknown_dtypes, remaining_dtype):
# the 'e' column in the second frame is missing here, emulating 'know_all_names=False' case
[
(["a", "b"], ["c", "d"], None),
(["a", "b", "d"], [], None),
(["a", "b"], [], None),
],
# result_cols_with_known_dtypes
["a", "b"],
# result_cols_with_unknown_dtypes:
# the missing 'e' column will be deducted from the resulted frame after '.concat()'
["c", "d", "e"],
],
[
# initial dtypes (cols_with_known_dtypes, cols_with_unknown_dtypes, remaining_dtype)
# the 'c' column in the first frame is described using 'remaining_dtype'
[
(["a", "b", "d"], [], np.dtype(bool)),
(["a", "b", "e", "d"], [], None),
(["a", "b"], [], None),
],
# result_cols_with_known_dtypes:
# remaining dtypes are not supported by 'concat(axis=0)', so dtype for the 'c'
# column is missing here
["a", "b", "e", "d"],
# result_cols_with_unknown_dtypes:
["c"],
],
],
)
def test_concat_axis_1(
self,
initial_dtypes,
result_cols_with_known_dtypes,
result_cols_with_unknown_dtypes,
):
"""
Test that ``DtypesDescriptor.concat(axis=1)`` works as expected.
Parameters
----------
initial_dtypes : list of tuples: (cols_with_known_dtypes, cols_with_unknown_dtypes, remaining_dtype)
Describe how to build ``DtypesDescriptor`` for each of the three dataframes.
result_cols_with_known_dtypes : list of labels
Column names for which dtypes has to be determined after ``.concat()``.
result_cols_with_unknown_dtypes : list of labels
Column names for which dtypes has to be unknown after ``.concat()``.
"""
md_df1, pd_df1 = create_test_dfs(
{
"a": [1, 2, 3],
"b": [3.5, 4.5, 5.5],
"c": [True, False, True],
"d": ["a", "b", "c"],
}
)
md_df2, pd_df2 = create_test_dfs(
{
"a": [1.5, 2.5, 3.5],
"b": [3.5, 4.5, 5.5],
"e": [True, False, True],
"d": ["a", "b", "c"],
}
)
md_df3, pd_df3 = create_test_dfs({"a": [1, 2, 3], "b": [3.5, 4.5, 5.5]})

for md_df, (known_cols, unknown_cols, remaining_dtype) in zip(
[md_df1, md_df2, md_df3], initial_dtypes
):
known_dtypes = {col: md_df.dtypes[col] for col in known_cols}
know_all_names = (
len(known_cols) + len(unknown_cols) == len(md_df.columns)
or remaining_dtype is not None
)
# setting columns cache to 'None', in order to prevent completing 'dtypes' with the materialized columns
md_df._query_compiler._modin_frame.set_columns_cache(None)
md_df._query_compiler._modin_frame.set_dtypes_cache(
ModinDtypes(
DtypesDescriptor(
known_dtypes,
unknown_cols,
remaining_dtype,
know_all_names=know_all_names,
)
)
)
md_dtypes = pd.concat(
[md_df1, md_df2, md_df3]
)._query_compiler._modin_frame._dtypes
pd_dtypes = pandas.concat([pd_df1, pd_df2, pd_df3]).dtypes
if len(result_cols_with_known_dtypes) == len(pd_dtypes):
md_dtypes = (
md_dtypes if isinstance(md_dtypes, pandas.Series) else md_dtypes._value
)
assert isinstance(md_dtypes, pandas.Series)
assert md_dtypes.equals(pd_dtypes)
else:
assert set(md_dtypes._value._known_dtypes.keys()) == set(
result_cols_with_known_dtypes
)
# reindexing to ensure proper order
md_known_dtypes = pandas.Series(md_dtypes._value._known_dtypes).reindex(
result_cols_with_known_dtypes
)
assert md_known_dtypes.equals(pd_dtypes[result_cols_with_known_dtypes])
assert set(md_dtypes._value._cols_with_unknown_dtypes) == set(
result_cols_with_unknown_dtypes
)

def test_ModinDtypes_duplicated_concat(self):
# test that 'ModinDtypes' is able to perform dtypes concatenation on duplicated labels
# if all of them are Serieses
Expand Down

0 comments on commit 794ac6f

Please sign in to comment.