Skip to content

Commit

Permalink
FIX-modin-project#6227: Make sure Series.unique() with pyarrow dtyp…
Browse files Browse the repository at this point in the history
…e returns `ArrowExtensionArray` (modin-project#7042)

Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev authored Apr 15, 2024
1 parent 8a9308a commit c31feba
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 8 deletions.
14 changes: 12 additions & 2 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2018,7 +2018,10 @@ def str_split(self, pat=None, n=-1, expand=False, regex=None):
def unique(self, keep="first", ignore_index=True, subset=None):
# kernels with 'pandas.Series.unique()' work faster
can_use_unique_kernel = (
subset is None and ignore_index and len(self.columns) == 1 and keep
subset is None
and ignore_index
and len(self.columns) == 1
and keep is not False
)

if not can_use_unique_kernel and not RangePartitioning.get():
Expand All @@ -2028,7 +2031,11 @@ def unique(self, keep="first", ignore_index=True, subset=None):
new_modin_frame = self._modin_frame._apply_func_to_range_partitioning(
key_columns=self.columns.tolist() if subset is None else subset,
func=(
(lambda df: pandas.DataFrame(df.squeeze(axis=1).unique()))
(
lambda df: pandas.DataFrame(
df.squeeze(axis=1).unique(), columns=["__reduced__"]
)
)
if can_use_unique_kernel
else (
lambda df: df.drop_duplicates(
Expand All @@ -2039,6 +2046,9 @@ def unique(self, keep="first", ignore_index=True, subset=None):
preserve_columns=True,
)
else:
# return self.to_pandas().squeeze(axis=1).unique() works faster
# but returns pandas type instead of query compiler
# TODO: https://github.com/modin-project/modin/issues/7182
new_modin_frame = self._modin_frame.apply_full_axis(
0,
lambda x: x.squeeze(axis=1).unique(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2862,7 +2862,7 @@ def to_pandas(self):
obj = obj.cast(schema)
# concatenate() is called by _partition_mgr_cls.to_pandas
# to preserve the categorical dtypes
df = concatenate([arrow_to_pandas(obj)])
df = concatenate([arrow_to_pandas(obj, self._dtypes)])
else:
df = obj.copy()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

"""Utilities for internal use by the ``HdkOnNativeDataframe``."""

from __future__ import annotations

import re
from functools import lru_cache
from typing import Any, Dict, List, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas
Expand All @@ -27,6 +29,9 @@
from modin.pandas.indexing import is_range_like
from modin.utils import MODIN_UNNAMED_SERIES_LABEL

if TYPE_CHECKING:
from modin.core.dataframe.pandas.metadata import ModinDtypes

EMPTY_ARROW_TABLE = pa.Table.from_pandas(pandas.DataFrame({}))


Expand Down Expand Up @@ -579,14 +584,18 @@ def ensure_supported_dtype(dtype):
raise NotImplementedError(f"Type {dtype}")


def arrow_to_pandas(at: pa.Table) -> pandas.DataFrame:
def arrow_to_pandas(
at: pa.Table, dtypes: Optional[Union[ModinDtypes, pandas.Series]] = None
) -> pandas.DataFrame:
"""
Convert the specified arrow table to pandas.
Parameters
----------
at : pyarrow.Table
The table to convert.
dtypes : Union[ModinDtypes, pandas.Series], optional
Dtypes are used to correctly map PyArrow types to pandas.
Returns
-------
Expand All @@ -597,6 +606,28 @@ def mapper(at):
if is_dictionary(at) and isinstance(at.value_type, ArrowIntervalType):
# The default mapper fails with TypeError: unhashable type: 'dict'
return _CategoricalDtypeMapper
elif dtypes is not None and any(
(
isinstance(dtype, pandas.core.dtypes.dtypes.ArrowDtype)
for dtype in dtypes
)
):
# for pandas types that are backed by pyarrow, for example: uint8[pyarrow]
dtype_mapping = {
pa.int8(): pandas.ArrowDtype(pa.int8()),
pa.int16(): pandas.ArrowDtype(pa.int16()),
pa.int32(): pandas.ArrowDtype(pa.int32()),
pa.int64(): pandas.ArrowDtype(pa.int64()),
pa.uint8(): pandas.ArrowDtype(pa.uint8()),
pa.uint16(): pandas.ArrowDtype(pa.uint16()),
pa.uint32(): pandas.ArrowDtype(pa.uint32()),
pa.uint64(): pandas.ArrowDtype(pa.uint64()),
pa.bool_(): pandas.ArrowDtype(pa.bool_()),
pa.float32(): pandas.ArrowDtype(pa.float32()),
pa.float64(): pandas.ArrowDtype(pa.float64()),
pa.string(): pandas.ArrowDtype(pa.string()),
}
return dtype_mapping.get(at, None)
return None

df = at.to_pandas(types_mapper=mapper)
Expand Down
10 changes: 7 additions & 3 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2107,9 +2107,13 @@ def unique(self) -> ArrayLike: # noqa: RT01, D200
"""
Return unique values of Series object.
"""
return self.__constructor__(
query_compiler=self._query_compiler.unique()
).to_numpy()
# `values` can't be used here because it performs unnecessary conversion,
# after which the result type does not match the pandas
return (
self.__constructor__(query_compiler=self._query_compiler.unique())
.modin.to_pandas()
._values
)

def update(self, other) -> None: # noqa: PR01, D200
"""
Expand Down
23 changes: 23 additions & 0 deletions modin/tests/pandas/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -3691,18 +3691,21 @@ def test_unique(data):
pandas_result = pandas_series.unique()
comparator(modin_result, pandas_result)
assert modin_result.shape == pandas_result.shape
assert type(modin_result) is type(pandas_result)

modin_result = pd.Series([2, 1, 3, 3], name="A").unique()
pandas_result = pandas.Series([2, 1, 3, 3], name="A").unique()
comparator(modin_result, pandas_result)
assert modin_result.shape == pandas_result.shape
assert type(modin_result) is type(pandas_result)

modin_result = pd.Series([pd.Timestamp("2016-01-01") for _ in range(3)]).unique()
pandas_result = pandas.Series(
[pd.Timestamp("2016-01-01") for _ in range(3)]
).unique()
comparator(modin_result, pandas_result)
assert modin_result.shape == pandas_result.shape
assert type(modin_result) is type(pandas_result)

modin_result = pd.Series(
[pd.Timestamp("2016-01-01", tz="US/Eastern") for _ in range(3)]
Expand All @@ -3712,11 +3715,13 @@ def test_unique(data):
).unique()
comparator(modin_result, pandas_result)
assert modin_result.shape == pandas_result.shape
assert type(modin_result) is type(pandas_result)

modin_result = pandas.Series(pd.Categorical(list("baabc"))).unique()
pandas_result = pd.Series(pd.Categorical(list("baabc"))).unique()
comparator(modin_result, pandas_result)
assert modin_result.shape == pandas_result.shape
assert type(modin_result) is type(pandas_result)

modin_result = pd.Series(
pd.Categorical(list("baabc"), categories=list("abc"), ordered=True)
Expand All @@ -3726,6 +3731,24 @@ def test_unique(data):
).unique()
comparator(modin_result, pandas_result)
assert modin_result.shape == pandas_result.shape
assert type(modin_result) is type(pandas_result)


def test_unique_pyarrow_dtype():
# See #6227 for details
modin_series, pandas_series = create_test_series(
[1, 0, pd.NA], dtype="uint8[pyarrow]"
)

def comparator(df1, df2):
# Perform our own non-strict version of dtypes equality check
df_equals(df1, df2)
# to be sure `unique` return `ArrowExtensionArray`
assert type(df1) is type(df2)

eval_general(
modin_series, pandas_series, lambda df: df.unique(), comparator=comparator
)


@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
Expand Down

0 comments on commit c31feba

Please sign in to comment.