From 73fe01e4e2b7ecf94ac62e66ddffdb6a4984e0d0 Mon Sep 17 00:00:00 2001 From: Andrey Pavlenko Date: Thu, 2 Nov 2023 14:49:21 +0100 Subject: [PATCH 1/2] PERF-#6696: Use cached dtypes in fillna when possible. Signed-off-by: Andrey Pavlenko --- .../dataframe/pandas/dataframe/dataframe.py | 7 +++++- .../storage_formats/pandas/query_compiler.py | 22 ++++++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 33ae3e3249b..1e224484993 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -2058,6 +2058,7 @@ def map( func: Callable, dtypes: Optional[str] = None, new_columns: Optional[pandas.Index] = None, + lazy=False, ) -> "PandasDataframe": """ Perform a function that maps across the entire dataset. @@ -2073,13 +2074,17 @@ def map( new_columns : pandas.Index, optional New column labels of the result, its length has to be identical to the older columns. If not specified, old column labels are preserved. + lazy : bool, default: False + Perform the function lazily. Returns ------- PandasDataframe A new dataframe. """ - new_partitions = self._partition_mgr_cls.map_partitions(self._partitions, func) + mgr_cls = self._partition_mgr_cls + map_func = mgr_cls.lazy_map_partitions if lazy else mgr_cls.map_partitions + new_partitions = map_func(self._partitions, func) if new_columns is not None and self.has_materialized_columns: assert len(new_columns) == len( self.columns diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 99dcb2f77e4..b3c07860031 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2420,6 +2420,7 @@ def fillna(self, **kwargs): method = kwargs.get("method", None) limit = kwargs.get("limit", None) full_axis = method is not None or limit is not None + new_dtypes = None if isinstance(value, BaseQueryCompiler): if squeeze_self: # Self is a Series type object @@ -2487,7 +2488,24 @@ def fillna(df): } return df.fillna(value=func_dict, **kwargs) + if self._modin_frame.has_materialized_dtypes: + dtypes = self._modin_frame.dtypes + if all( + find_common_type([dtypes[c], t]) == dtypes[c] + for (c, t) in pandas.DataFrame( + {k: [v] for (k, v) in value.items()} + ).dtypes.items() + if c in dtypes + ): + new_dtypes = dtypes + else: + if self._modin_frame.has_materialized_dtypes: + dtype = pandas.Series(value).dtype + if all( + find_common_type([t, dtype]) == t for t in self._modin_frame.dtypes + ): + new_dtypes = self._modin_frame.dtypes def fillna(df): return df.fillna(value=value, **kwargs) @@ -2495,7 +2513,9 @@ def fillna(df): if full_axis: new_modin_frame = self._modin_frame.fold(axis, fillna) else: - new_modin_frame = self._modin_frame.map(fillna) + new_modin_frame = self._modin_frame.map( + fillna, dtypes=new_dtypes, lazy=True + ) return self.__constructor__(new_modin_frame) def quantile_for_list_of_values(self, **kwargs): From 92640b19a46405baab1438d953ab7cebe8ccf3bb Mon Sep 17 00:00:00 2001 From: Andrey Pavlenko Date: Thu, 9 Nov 2023 09:38:47 +0100 Subject: [PATCH 2/2] Apply suggestions from code review --- .../core/dataframe/pandas/dataframe/dataframe.py | 7 +------ .../core/storage_formats/pandas/query_compiler.py | 15 +++++++-------- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 1e224484993..33ae3e3249b 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -2058,7 +2058,6 @@ def map( func: Callable, dtypes: Optional[str] = None, new_columns: Optional[pandas.Index] = None, - lazy=False, ) -> "PandasDataframe": """ Perform a function that maps across the entire dataset. @@ -2074,17 +2073,13 @@ def map( new_columns : pandas.Index, optional New column labels of the result, its length has to be identical to the older columns. If not specified, old column labels are preserved. - lazy : bool, default: False - Perform the function lazily. Returns ------- PandasDataframe A new dataframe. """ - mgr_cls = self._partition_mgr_cls - map_func = mgr_cls.lazy_map_partitions if lazy else mgr_cls.map_partitions - new_partitions = map_func(self._partitions, func) + new_partitions = self._partition_mgr_cls.map_partitions(self._partitions, func) if new_columns is not None and self.has_materialized_columns: assert len(new_columns) == len( self.columns diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index b3c07860031..1e083a33ebf 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2490,12 +2490,13 @@ def fillna(df): if self._modin_frame.has_materialized_dtypes: dtypes = self._modin_frame.dtypes + value_dtypes = pandas.DataFrame( + {k: [v] for (k, v) in value.items()} + ).dtypes if all( - find_common_type([dtypes[c], t]) == dtypes[c] - for (c, t) in pandas.DataFrame( - {k: [v] for (k, v) in value.items()} - ).dtypes.items() - if c in dtypes + find_common_type([dtypes[col], dtype]) == dtypes[col] + for (col, dtype) in value_dtypes.items() + if col in dtypes ): new_dtypes = dtypes @@ -2513,9 +2514,7 @@ def fillna(df): if full_axis: new_modin_frame = self._modin_frame.fold(axis, fillna) else: - new_modin_frame = self._modin_frame.map( - fillna, dtypes=new_dtypes, lazy=True - ) + new_modin_frame = self._modin_frame.map(fillna, dtypes=new_dtypes) return self.__constructor__(new_modin_frame) def quantile_for_list_of_values(self, **kwargs):