diff --git a/CHANGES.rst b/CHANGES.rst index 673b64da4..9a8436cdd 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -32,6 +32,13 @@ Major changes * Parallelized the :class:`GapEncoder` column-wise. Parameters `n_jobs` and `verbose` added to the signature. :pr:`582` by :user:`Lilian Boulard ` + +Minor changes +------------- + +* :class:`TableVectorizer` is now able to apply parallelism at the column level rather than the transformer level. This is the default for univariate transformers, like :class:`MinHashEncoder`, and :class:`GapEncoder`. + :pr:`592` by :user:`Leo Grinsztajn ` + * Parallelized the :func:`deduplicate` function. Parameter `n_jobs` added to the signature. :pr:`618` by :user:`Jovan Stojanovic ` and :user:`Lilian Boulard ` @@ -174,6 +181,8 @@ Minor changes :pr:`543` by :user:`Leo Grinsztajn ` :pr:`587` by :user:`Leo Grinsztajn ` + + Dirty-cat Release 0.4.0 ========================= diff --git a/skrub/_gap_encoder.py b/skrub/_gap_encoder.py index ecf60400f..2c78b98fc 100644 --- a/skrub/_gap_encoder.py +++ b/skrub/_gap_encoder.py @@ -1,6 +1,7 @@ """ Implements the GapEncoder: a probabilistic encoder for categorical variables. """ +from __future__ import annotations from collections.abc import Generator from copy import deepcopy @@ -13,7 +14,7 @@ from numpy.random import RandomState from numpy.typing import ArrayLike, NDArray from scipy import sparse -from sklearn.base import BaseEstimator, TransformerMixin +from sklearn.base import BaseEstimator, TransformerMixin, clone from sklearn.cluster import KMeans, kmeans_plusplus from sklearn.decomposition._nmf import _beta_divergence from sklearn.feature_extraction.text import CountVectorizer, HashingVectorizer @@ -739,6 +740,43 @@ class GapEncoder(TransformerMixin, BaseEstimator): fitted_models_: list[GapEncoderColumn] column_names_: list[str] + @classmethod + def _merge(cls, transformers_list: list[GapEncoder]): + """ + Merge GapEncoders fitted on different columns + into a single GapEncoder. This is useful for parallelization + over columns in the TableVectorizer. + """ + full_transformer = clone(transformers_list[0]) + # assert rho_ is the same for all transformers + rho_ = transformers_list[0].rho_ + full_transformer.rho_ = rho_ + full_transformer.fitted_models_ = [] + for transformers in transformers_list: + full_transformer.fitted_models_.extend(transformers.fitted_models_) + if hasattr(transformers_list[0], "column_names_"): + full_transformer.column_names_ = [] + for transformers in transformers_list: + full_transformer.column_names_.extend(transformers.column_names_) + return full_transformer + + def _split(self): + """ + Split a GapEncoder fitted on multiple columns + into a list of GapEncoders fitted on one column each. + This is useful for parallelizing transform over columns + in the TableVectorizer. + """ + check_is_fitted(self) + transformers_list = [] + for i, model in enumerate(self.fitted_models_): + transformer = clone(self) + transformer.rho_ = model.rho_ + transformer.fitted_models_ = [model] + transformer.column_names_ = [self.column_names_[i]] + transformers_list.append(transformer) + return transformers_list + def __init__( self, *, @@ -1025,6 +1063,10 @@ def _more_tags(self): ), "check_estimators_dtypes": "We only support string dtypes.", }, + "univariate": True, # whether the estimator is univariate and can be + # applied column by column. This is useful for the TableVectorizer, + # to decide whether to apply the transformer on each column separately + # and thus improve the parallelization when the transformer is slow enough. } diff --git a/skrub/_minhash_encoder.py b/skrub/_minhash_encoder.py index 43515d440..1c5b9d7be 100644 --- a/skrub/_minhash_encoder.py +++ b/skrub/_minhash_encoder.py @@ -2,6 +2,7 @@ Implements the MinHashEncoder, which encodes string categorical features by applying the MinHash method to n-gram decompositions of strings. """ +from __future__ import annotations from collections.abc import Callable, Collection from typing import Literal @@ -9,13 +10,13 @@ import numpy as np from joblib import Parallel, delayed, effective_n_jobs from numpy.typing import ArrayLike, NDArray -from sklearn.base import BaseEstimator, TransformerMixin +from sklearn.base import BaseEstimator, TransformerMixin, clone from sklearn.utils import gen_even_slices, murmurhash3_32 from sklearn.utils.validation import _check_feature_names_in, check_is_fitted from ._fast_hash import ngram_min_hash from ._string_distances import get_unique_ngrams -from ._utils import LRUDict, check_input +from ._utils import LRUDict, check_input, combine_lru_dicts NoneType = type(None) @@ -120,6 +121,47 @@ class MinHashEncoder(TransformerMixin, BaseEstimator): _capacity: int = 2**10 + @classmethod + def _merge(cls, transformers_list: list[MinHashEncoder]): + """ + Merge MinHashEncoders fitted on different columns + into a single MinHashEncoder. This is useful for parallelization + over columns in the TableVectorizer. + """ + full_transformer = clone(transformers_list[0]) + capacity = transformers_list[0]._capacity + full_transformer.hash_dict_ = combine_lru_dicts( + capacity, *[transformer.hash_dict_ for transformer in transformers_list] + ) + full_transformer.n_features_in_ = sum( + transformer.n_features_in_ for transformer in transformers_list + ) + full_transformer.feature_names_in_ = np.concatenate( + [transformer.feature_names_in_ for transformer in transformers_list] + ) + return full_transformer + + def _split(self): + """ + Split a MinHashEncoder fitted on multiple columns + into a list of MinHashEncoders (one for each column). + This is useful for parallelizing transform over columns + in the TableVectorizer. + """ + check_is_fitted(self) + transformer_list = [] + for i in range(self.n_features_in_): + trans = clone(self) + attributes = ["hash_dict_", "_capacity"] + for a in attributes: + if hasattr(self, a): + setattr(trans, a, getattr(self, a)) + # TODO; do we want to deepcopy hash_dict_ + trans.n_features_in_ = 1 + trans.feature_names_in_ = np.array([self.feature_names_in_[i]]) + transformer_list.append(trans) + return transformer_list + def __init__( self, *, @@ -395,4 +437,8 @@ def _more_tags(self): ), "check_estimators_dtypes": "We only support string dtypes.", }, + "univariate": True, # whether the estimator is univariate and can be + # applied column by column. This is useful for the TableVectorizer, + # to decide whether to apply the transformer on each column separately + # and thus improve the parallelization when the transformer is slow enough. } diff --git a/skrub/_table_vectorizer.py b/skrub/_table_vectorizer.py index 2ce1f5e2e..751a49272 100644 --- a/skrub/_table_vectorizer.py +++ b/skrub/_table_vectorizer.py @@ -6,6 +6,7 @@ import warnings from collections import Counter +from itertools import chain from typing import Literal from warnings import warn @@ -150,6 +151,140 @@ def _replace_missing_in_cat_col(ser: pd.Series, value: str = "missing") -> pd.Se Transformer = TransformerMixin | Literal["drop", "remainder", "passthrough"] +def _parallel_on_columns(trans: Transformer, cols: list[str]) -> bool: + """ + Assert whether we want to parallelize the transformer over + the columns or not. We only want to parallelize over columns if the transformer + is "univariate" (i.e. it can be duplicated for each column). + """ + return ( + (not isinstance(trans, str)) + and trans._get_tags().get("univariate", False) + and len(cols) > 1 + ) + + +def _split_transformers( + transformers: list[tuple[str, Transformer, list[str]]], + transformers_to_input_indices: dict[str, list[int]] | None = None, + during_fit: bool = False, +) -> tuple[list[tuple[str, Transformer, list[str]]], dict[str, list[int]]]: + """ + Split univariate transformers into multiple transformers, one for each + column. This is useful to use the inherited `ColumnTransformer` class + parallelism. + + Parameters + ---------- + transformers : list of 3-tuples (str, Transformer or str, list of str) + The collection of transformers to split, as tuples of + (name, transformer, column). + transformers_to_input_indices : dict of str to list of int, optional + The mapping of transformer names to the indices of the columns they were + fitted on. Should correspond to the `self._transformer_to_input_indices` attribute. + Only used when `during_fit` is False. + during_fit : bool, default=False + Whether the method is called during `fit_transform` (True) or + during `transform` (False). This is used to determine if the + transformers in `transformers` are fitted or not, i.e. whether + the `transformers` argument corresponds to the `self.transformers_` attribute + (when False) or the `self.transformers` attribute (when True). + """ + new_transformers = [] + new_transformer_to_input_indices = ( + {} if not during_fit else transformers_to_input_indices + ) + if during_fit: + # split a list of 3-tuples (name, transformer, columns) + # containing the unfitted transformers (or strings) and the columns + # to be fitted on. + for name, trans, cols in transformers: + if _parallel_on_columns(trans, cols): + for i, col in enumerate(cols): + new_transformers.append((f"{name}_split_{i}", clone(trans), [col])) + else: + new_transformers.append((name, trans, cols)) + else: + # split a list of 3-tuples (name, transformer, columns) + # containing the fitted transformers (or strings) and the columns + # they were fitted on. + for name, trans, cols in transformers: + if _parallel_on_columns(trans, cols): + splitted_transformers_ = trans._split() + for i, (col, trans, trans_to_mapping) in enumerate( + zip( + cols, + splitted_transformers_, + transformers_to_input_indices[name], + ) + ): + name_split = f"{name}_split_{i}" + new_transformers.append((name_split, trans, [col])) + new_transformer_to_input_indices[name_split] = [trans_to_mapping] + else: + new_transformers.append((name, trans, cols)) + new_transformer_to_input_indices[name] = transformers_to_input_indices[ + name + ] + + return new_transformers, new_transformer_to_input_indices + + +def _merge_transformers( + transformers: list[tuple[str, Transformer, list[str]]], + is_fitted: bool, + transformer_to_input_indices: dict[str, list[int]] | None = None, +) -> tuple[list[tuple[str, Transformer, list[str]]], dict[str, list[int]]]: + """ + Merge splitted transformers into a single transformer. + + Parameters + ---------- + transformers : list of 3-tuples (str, Transformer or str, list of str) + The collection of transformers to merge, as tuples of + (name, transformer, column). + is_fitted : bool + Whether the transformers are fitted or not, i.e. whether the + `transformers` argument corresponds to the `self.transformers_` attribute + (when True) or the `self.transformers` attribute (when False). + transformer_to_input_indices : dict of str to list of int, optional + The mapping of transformer names to the indices of the columns they were + fitted on. Should correspond to the `self._transformer_to_input_indices` attribute. + Only used when `is_fitted` is True. + """ + new_transformers = [] + new_transformer_to_input_indices = {} if is_fitted else transformer_to_input_indices + base_names = pd.unique( + pd.Series([name.split("_split_")[0] for name, _, _ in transformers]) + ) + + for base_name in base_names: + # merge all transformers with the same base name + transformers_base_name, names, columns = [], [], [] + for name, trans, cols in transformers: + if name.startswith(base_name): + columns.extend(cols) + transformers_base_name.append(trans) + names.append(name) + + new_trans = transformers_base_name[0] + if not is_fitted: + if isinstance(new_trans, TransformerMixin): + new_trans = clone(new_trans) + else: + if len(transformers_base_name) > 1: + # merge transformers + new_trans = new_trans.__class__._merge(transformers_base_name) + new_transformer_to_input_indices[base_name] = list( + chain.from_iterable( + [transformer_to_input_indices[name] for name in names] + ) + ) + new_transformers.append((base_name, new_trans, columns)) + + return new_transformers, new_transformer_to_input_indices + + class TableVectorizer(ColumnTransformer): """Automatically transform a heterogeneous dataframe to a numerical array. @@ -359,7 +494,7 @@ class TableVectorizer(ColumnTransformer): ] """ - transformers_: list[tuple[str, str | TransformerMixin, list[str]]] + transformers_: list[tuple[str, Transformer, list[str]]] columns_: pd.Index types_: dict[str, type] imputed_columns_: list[str] @@ -369,6 +504,8 @@ class TableVectorizer(ColumnTransformer): datetime_transformer_: Transformer specific_transformers_: list[tuple[str, Transformer, list[str, int]]] + _transformer_to_input_indices: dict[str, list[int]] + # Override required parameters _required_parameters = [] @@ -428,6 +565,13 @@ def _more_tags(self) -> dict: }, } + @property + def is_parallelized(self) -> bool: + """ + Returns True if the transformers are parallelized over columns, False otherwise. + """ + return self.n_jobs not in (None, 1) + def _clone_transformers(self) -> None: """ For each of the different transformers that can be passed, @@ -532,6 +676,57 @@ def _clone_transformers(self) -> None: # TODO: check that the provided transformers are valid + def _split_univariate_transformers(self, during_fit: bool = False): + """ + Split univariate transformers into multiple transformers, one for each + column. This is useful to use the inherited `ColumnTransformer` class + parallelism. + + Parameters + ---------- + during_fit : bool, default=False + Whether the method is called during `fit_transform` (True) or + during `transform` (False). This is used to determine whether + to split the self.transformers_ attribute (when False) or the + self.transformers attribute (when True). + """ + if during_fit: + # split self.transformers, a list of 3-tuples (name, transformer, columns) + # containing the unfitted transformers (or strings) and the columns + # to be fitted on. This attribute is used by the `ColumnTransformer` + # when calling `fit` and `fit_transform`. + self.transformers, _ = _split_transformers( + self.transformers, during_fit=True + ) + else: + # split self.transformers_, a list of 3-tuples (name, transformer, columns) + # containing the fitted transformers (or strings) and the columns + # they were fitted on. This attribute is used by the `ColumnTransformer` + # when calling `transform`. + check_is_fitted(self, attributes=["transformers_"]) + ( + self.transformers_, + self._transformer_to_input_indices, + ) = _split_transformers( + self.transformers_, + during_fit=False, + transformers_to_input_indices=self._transformer_to_input_indices, + ) + + def _merge_univariate_transformers(self): + """ + Merge splitted transformers into a single transformer. + To be used after `_split_univariate_transformers`. + """ + # merge self.transformers and self.transformers_ + check_is_fitted(self, attributes=["transformers_"]) + self.transformers, _ = _merge_transformers(self.transformers, is_fitted=False) + self.transformers_, self._transformer_to_input_indices = _merge_transformers( + self.transformers_, + is_fitted=True, + transformer_to_input_indices=self._transformer_to_input_indices, + ) + def _auto_cast(self, X: pd.DataFrame) -> pd.DataFrame: """Takes a dataframe and tries to convert its columns to their best possible data type. @@ -842,6 +1037,11 @@ def fit_transform(self, X: ArrayLike, y: ArrayLike = None) -> ArrayLike: if self.verbose: print(f"[TableVectorizer] Assigned transformers: {self.transformers}") + # split the univariate transformers on each column + # to be able to parallelize the encoding + if self.is_parallelized: + self._split_univariate_transformers(during_fit=True) + X_enc = super().fit_transform(X, y) # For the "remainder" columns, the `ColumnTransformer` `transformers_` @@ -854,6 +1054,9 @@ def fit_transform(self, X: ArrayLike, y: ArrayLike = None) -> ArrayLike: cols: list[int] self.transformers_[i] = (name, enc, [self.columns_[j] for j in cols]) + if self.is_parallelized: + self._merge_univariate_transformers() + return X_enc def transform(self, X: ArrayLike) -> ArrayLike: @@ -882,7 +1085,17 @@ def transform(self, X: ArrayLike) -> ArrayLike: if self.auto_cast: X = self._apply_cast(X) - return super().transform(X) + # split the univariate transformers on each column + # to be able to parallelize the encoding + if self.is_parallelized: + self._split_univariate_transformers(during_fit=False) + + res = super().transform(X) + + if self.is_parallelized: + self._merge_univariate_transformers() + + return res def get_feature_names_out(self, input_features=None) -> list[str]: """Return clean feature names. diff --git a/skrub/_utils.py b/skrub/_utils.py index 4e1efc2f6..95dd469ea 100644 --- a/skrub/_utils.py +++ b/skrub/_utils.py @@ -39,6 +39,14 @@ def __contains__(self, key: Hashable): return key in self.cache +def combine_lru_dicts(capacity: int, *lru_dicts: LRUDict) -> LRUDict: + combined_lru_dict = LRUDict(capacity) + for lru_dict in lru_dicts: + for key, value in lru_dict.cache.items(): + combined_lru_dict[key] = value + return combined_lru_dict + + def check_input(X) -> NDArray: """ Check input with sklearn standards. diff --git a/skrub/tests/test_gap_encoder.py b/skrub/tests/test_gap_encoder.py index ca364b7c8..7b6086d77 100644 --- a/skrub/tests/test_gap_encoder.py +++ b/skrub/tests/test_gap_encoder.py @@ -1,6 +1,9 @@ +import copy + import numpy as np import pandas as pd import pytest +from numpy.testing import assert_array_equal from sklearn.exceptions import NotFittedError from sklearn.model_selection import train_test_split @@ -275,3 +278,103 @@ def test_max_no_improvements_none() -> None: X = generate_data(300, random_state=0) enc_none = GapEncoder(n_components=2, max_no_improvement=None, random_state=42) enc_none.fit(X) + + +def test_merge_transformers() -> None: + # test whether fitting on each column separately and then merging the + # transformers gives the same result as fitting on the whole dataset + + # generate data + X = np.concatenate([generate_data(100, random_state=i) for i in range(3)], axis=1) + X = pd.DataFrame(X, columns=["col0", "col1", "col2"]) + + # fit on each column separately + enc_list = [] + for i in range(3): + enc = GapEncoder(random_state=42) + enc.fit(X[[f"col{i}"]]) + enc_list.append(enc) + enc_merged = GapEncoder._merge(enc_list) + + # fit on the whole dataset + enc = GapEncoder(random_state=42) + enc.fit(X) + + # check that the results are the same + # check transform + assert np.allclose(enc_merged.transform(X), enc.transform(X)) + # check get_feature_names_out + assert enc_merged.get_feature_names_out() == enc.get_feature_names_out() + # check score + assert enc_merged.score(X) == enc.score(X) + # check all attributes + enc_merged.rho_ == enc.rho_ + enc_merged.column_names_ == enc.column_names_ + + +def test_split_transformers() -> None: + # check that splitting the transformer after fitting + # doesn't change the output of transform + + # generate data + X = np.concatenate([generate_data(100, random_state=i) for i in range(3)], axis=1) + X = pd.DataFrame(X, columns=["col0", "col1", "col2"]) + + # fit on the whole dataset + enc = GapEncoder(random_state=42) + enc.fit(X) + + # split the transformer + enc_list = copy.deepcopy(enc)._split() + + # fit on each column separately + index = 0 + for i in range(3): + # check that the results are the same + # check transform + transformed_X_i = enc_list[i].transform(X[[f"col{i}"]]) + assert np.allclose( + transformed_X_i, + enc.transform(X)[:, index : index + transformed_X_i.shape[1]], + ) + # check get_feature_names_out + assert_array_equal( + np.array(enc_list[i].get_feature_names_out()), + np.array(enc.get_feature_names_out())[ + index : index + transformed_X_i.shape[1] + ], + ) + index += transformed_X_i.shape[1] + # check all attributes + assert enc_list[i].rho_ == enc.rho_ + assert enc_list[i].column_names_ == [f"col{i}"] + + +def test_split_and_merge_transformers() -> None: + # check that splitting the transformer after fitting + # and then merging the transformers doesn't change the result + + # generate data + X = np.concatenate([generate_data(100, random_state=i) for i in range(3)], axis=1) + X = pd.DataFrame(X, columns=["col0", "col1", "col2"]) + + # fit on the whole dataset + enc = GapEncoder(random_state=42) + enc.fit(X) + + # split the transformer + enc_list = copy.deepcopy(enc)._split() + + # merge the transformers + enc_merged = GapEncoder._merge(enc_list) + + # check that the results are the same + # check transform + assert_array_equal(enc_merged.transform(X), enc.transform(X)) + # check get_feature_names_out + assert enc_merged.get_feature_names_out() == enc.get_feature_names_out() + # check score + assert enc_merged.score(X) == enc.score(X) + # check all attributes + enc_merged.rho_ == enc.rho_ + enc_merged.column_names_ == enc.column_names_ diff --git a/skrub/tests/test_minhash_encoder.py b/skrub/tests/test_minhash_encoder.py index de547434c..63ce8c148 100644 --- a/skrub/tests/test_minhash_encoder.py +++ b/skrub/tests/test_minhash_encoder.py @@ -1,3 +1,4 @@ +import copy import random from string import ascii_lowercase @@ -294,3 +295,121 @@ def test_get_feature_names_out(): encoder.fit(X) expected_columns = np.array(["x0_0", "x0_1", "x0_2", "x0_3"]) assert_array_equal(np.array(encoder.get_feature_names_out()), expected_columns) + + +def test_merge_transformers() -> None: + # check that fitting on each column separately and then merging the + # transformers gives the same result as fitting on the whole dataset + + # generate data + X = np.concatenate([generate_data(100, random_state=i) for i in range(3)], axis=1) + X = pd.DataFrame(X, columns=["col0", "col1", "col2"]) + + # fit on each column separately + enc_list = [] + for i in range(3): + enc = MinHashEncoder() + enc.fit(X[[f"col{i}"]]) + enc_list.append(enc) + enc_merged = MinHashEncoder._merge(enc_list) + + # fit on the whole dataset + enc = MinHashEncoder() + enc.fit(X) + + # check that the results are the same + # check transform + assert_array_equal(enc_merged.transform(X), enc.transform(X)) + # check get_feature_names_out + # assert enc_merged.get_feature_names_out() == enc.get_feature_names_out() + # check that the hash_dict_ attribute is the same + assert enc.hash_dict_.cache.keys() == enc_merged.hash_dict_.cache.keys() + for key in enc.hash_dict_.cache.keys(): + assert_array_equal(enc.hash_dict_.cache[key], enc_merged.hash_dict_.cache[key]) + # check all attributes + assert enc_merged._capacity == enc._capacity + assert enc_merged.n_features_in_ == enc.n_features_in_ + # check feature_names_in_ + assert_array_equal(enc_merged.feature_names_in_, enc.feature_names_in_) + + +def test_split_transformers() -> None: + # check that splitting the transformer after fitting + # doesn't change the output of transform + + # generate data + X = np.concatenate([generate_data(100, random_state=i) for i in range(3)], axis=1) + X = pd.DataFrame(X, columns=["col0", "col1", "col2"]) + + # fit on the whole dataset + enc = MinHashEncoder() + enc.fit_transform(X) + + # split the transformer + enc_list = copy.deepcopy(enc)._split() + + # fit on each column separately + index = 0 + for i in range(3): + # check that the results are the same + # check transform + transformed_X_i = enc_list[i].transform(X[[f"col{i}"]]) + assert_array_equal( + transformed_X_i, + enc.transform(X)[:, index : index + transformed_X_i.shape[1]], + ) + # check get_feature_names_out + assert_array_equal( + np.array(enc_list[i].get_feature_names_out()), + np.array(enc.get_feature_names_out())[ + index : index + transformed_X_i.shape[1] + ], + ) + # check self.feature_names_in_ + assert enc_list[i].feature_names_in_ == [f"col{i}"] + # check self.n_features_in_ + assert enc_list[i].n_features_in_ == 1 + index += transformed_X_i.shape[1] + # check all attributes + assert enc_list[i]._capacity == enc._capacity + # check hash_dict_ + # TODO: do we want the hash_dict_ to be the same? + assert enc.hash_dict_.cache.keys() == enc_list[i].hash_dict_.cache.keys() + for key in enc.hash_dict_.cache.keys(): + assert_array_equal( + enc.hash_dict_.cache[key], enc_list[i].hash_dict_.cache[key] + ) + + +def test_split_and_merge_transformers() -> None: + # check that splitting the transformer after fitting + # and then merging the transformers doesn't change the result + + # generate data + X = np.concatenate([generate_data(100, random_state=i) for i in range(3)], axis=1) + X = pd.DataFrame(X, columns=["col0", "col1", "col2"]) + + # fit on the whole dataset + enc = MinHashEncoder() + enc.fit(X) + + # split the transformer + enc_list = copy.deepcopy(enc)._split() + + # merge the transformers + enc_merged = MinHashEncoder._merge(enc_list) + + # check that the results are the same + # check transform + assert_array_equal(enc_merged.transform(X), enc.transform(X)) + # check get_feature_names_out + assert enc_merged.get_feature_names_out() == enc.get_feature_names_out() + # check hash_dict_ + assert enc.hash_dict_.cache.keys() == enc_merged.hash_dict_.cache.keys() + for key in enc.hash_dict_.cache.keys(): + assert_array_equal(enc.hash_dict_.cache[key], enc_merged.hash_dict_.cache[key]) + # check all attributes + assert enc_merged._capacity == enc._capacity + assert enc_merged.n_features_in_ == enc.n_features_in_ + # check feature_names_in_ + assert_array_equal(enc_merged.feature_names_in_, enc.feature_names_in_) diff --git a/skrub/tests/test_table_vectorizer.py b/skrub/tests/test_table_vectorizer.py index e72c9d0db..e978f3242 100644 --- a/skrub/tests/test_table_vectorizer.py +++ b/skrub/tests/test_table_vectorizer.py @@ -1,15 +1,20 @@ +import joblib import numpy as np import pandas as pd import pytest from sklearn.exceptions import NotFittedError -from sklearn.preprocessing import FunctionTransformer, StandardScaler +from sklearn.preprocessing import FunctionTransformer, OneHotEncoder, StandardScaler +from sklearn.utils._testing import assert_array_equal, skip_if_no_parallel from sklearn.utils.validation import check_is_fitted from skrub import GapEncoder, MinHashEncoder, SuperVectorizer, TableVectorizer from skrub._table_vectorizer import _infer_date_format +from skrub.tests.utils import transformers_list_equal -def check_same_transformers(expected_transformers: dict, actual_transformers: list): +def check_same_transformers( + expected_transformers: dict, actual_transformers: list +) -> None: # Construct the dict from the actual transformers actual_transformers_dict = {name: cols for name, trans, cols in actual_transformers} assert actual_transformers_dict == expected_transformers @@ -731,7 +736,7 @@ def test_deterministic(pipeline) -> None: X_enc_prev = X_enc -def test_mixed_types(): +def test_mixed_types() -> None: # TODO: datetime/str mixed types # don't work df = _get_mixed_types_dataframe() @@ -804,7 +809,9 @@ def test_mixed_types(): ), ], ) -def test_changing_types(X_fit, X_transform_original, X_transform_with_missing_original): +def test_changing_types( + X_fit, X_transform_original, X_transform_with_missing_original +) -> None: """ Test that the TableVectorizer performs properly when the type inferred during fit does not match the type of the @@ -834,7 +841,7 @@ def test_changing_types(X_fit, X_transform_original, X_transform_with_missing_or assert np.allclose(res, res_missing, equal_nan=True) -def test_changing_types_int_float(): +def test_changing_types_int_float() -> None: # The TableVectorizer shouldn't cast floats to ints # even if only ints were seen during fit X_fit, X_transform = ( @@ -847,6 +854,212 @@ def test_changing_types_int_float(): assert np.allclose(res, np.array([[1.0], [2.0], [3.3]])) +def test_column_by_column() -> None: + # Test that the TableVectorizer gives the same result + # when applied column by column + X = _get_clean_dataframe() + table_vec_all_cols = TableVectorizer( + high_card_cat_transformer=GapEncoder(n_components=2, random_state=0), + cardinality_threshold=4, + ) + table_vec_all_cols.fit(X) + for col in X.columns: + table_vec_one_col = TableVectorizer( + high_card_cat_transformer=GapEncoder(n_components=2, random_state=0), + cardinality_threshold=4, + ) + table_vec_one_col.fit(X[[col]]) + features_from_col = [ + feat + for feat in table_vec_all_cols.get_feature_names_out() + if feat.startswith(col) + ] + assert table_vec_one_col.get_feature_names_out() == features_from_col + indices_features_from_col = [ + table_vec_all_cols.get_feature_names_out().index(feat) + for feat in features_from_col + ] + excepted_result = table_vec_all_cols.transform(X)[:, indices_features_from_col] + assert np.allclose( + table_vec_one_col.transform(X[[col]]), + excepted_result, + ) + + +@skip_if_no_parallel +@pytest.mark.parametrize( + "high_card_cat_transformer", + # the gap encoder and the minhashencoder + # should be parallelized on all columns + # the one hot encoder should not be parallelized + [ + GapEncoder(n_components=2, random_state=0), + OneHotEncoder(), + MinHashEncoder(n_components=2), + ], +) +def test_parallelism(high_card_cat_transformer) -> None: + # Test that parallelism works + X = _get_clean_dataframe() + table_vec_no_parallel = TableVectorizer( + high_card_cat_transformer=high_card_cat_transformer, + cardinality_threshold=4, + ) + X_trans = table_vec_no_parallel.fit_transform(X) + with joblib.parallel_backend("loky"): + for n_jobs in [None, 2, -1]: + table_vec = TableVectorizer( + n_jobs=n_jobs, + high_card_cat_transformer=high_card_cat_transformer, + cardinality_threshold=4, + ) + X_trans_parallel = table_vec.fit_transform(X) + assert_array_equal(X_trans, X_trans_parallel) + assert table_vec.n_jobs == n_jobs + # assert that all attributes are equal except for + # the n_jobs attribute + assert transformers_list_equal( + table_vec.transformers_, table_vec_no_parallel.transformers_ + ) + assert (table_vec.columns_ == table_vec_no_parallel.columns_).all() + assert table_vec.types_ == table_vec_no_parallel.types_ + assert table_vec.imputed_columns_ == table_vec_no_parallel.imputed_columns_ + # assert that get_feature_names_out gives the same result + assert_array_equal( + table_vec.get_feature_names_out(), + table_vec_no_parallel.get_feature_names_out(), + ) + # assert that get_params gives the same result expect for n_jobs + # remove n_jobs from the dict + params = table_vec.get_params() + params.pop("n_jobs") + params_no_parallel = table_vec_no_parallel.get_params() + params_no_parallel.pop("n_jobs") + assert str(params) == str(params_no_parallel) + # assert that transform gives the same result + assert_array_equal( + table_vec.transform(X), table_vec_no_parallel.transform(X) + ) + + +@pytest.mark.parametrize( + "high_card_cat_transformer", + [ + GapEncoder(n_components=2, random_state=0), + MinHashEncoder(n_components=2), + ], +) +def test_split_and_merge_univariate_transformers(high_card_cat_transformer) -> None: + X = _get_clean_dataframe() + enc = TableVectorizer( + high_card_cat_transformer=high_card_cat_transformer, + cardinality_threshold=4, + n_jobs=None, # should disable the default splitting and merging + ) + + enc.fit(X) + assert len(enc.transformers) == 3 + + enc_split = TableVectorizer( + high_card_cat_transformer=high_card_cat_transformer, + cardinality_threshold=4, + n_jobs=None, + ) + enc_split.fit(X) + # during actual use, this is done during fit + enc_split._split_univariate_transformers(during_fit=True) + # check that the high_card_cat_transformer + # is split into 2 transformers + # the transformers_ attribute should not be modified + # because during_fit is True + assert len(enc_split.transformers) == 4 + assert len(enc_split.transformers_) == 3 + enc_split._merge_univariate_transformers() + # check that the GapEncoder is merged into 1 transformer + assert len(enc_split.transformers) == 3 + assert np.allclose(enc.transform(X), enc_split.transform(X)) + # assert that the transformers attribute is the same as + # the one before splitting and merging + assert str(enc.transformers) == str(enc_split.transformers) + # check that you can refit the transformer + enc_split.fit(X) + + # Now split the transformers_ attribute (during_fit=False) + enc_split._split_univariate_transformers(during_fit=False) + assert len(enc_split.transformers) == 3 + assert len(enc_split.transformers_) == 4 + # the fitted transformers should still work + assert_array_equal(enc.transform(X), enc_split.transform(X)) + + enc_split._merge_univariate_transformers() + # check that the GapEncoder is merged into 1 transformer + assert len(enc_split.transformers_) == 3 + assert_array_equal(enc.transform(X), enc_split.transform(X)) + + # assert that the transformers attribute is the same as + # the one before splitting and merging + assert str(enc.transformers_) == str(enc_split.transformers_) + + +def test_split_one_hot_encoder() -> None: + # check that a OneHotEncoder is not split + X = _get_clean_dataframe() + enc_one_hot = TableVectorizer( + high_card_cat_transformer=OneHotEncoder(handle_unknown="error"), + low_card_cat_transformer=OneHotEncoder( + handle_unknown="ignore" + ), # change the default to have a different transformer + cardinality_threshold=4, + n_jobs=None, + ) + enc_one_hot.fit(X) + assert len(enc_one_hot.transformers) == 3 + + +@pytest.mark.parametrize( + "low_card_cat_transformer", + [ + # transformers that should be split + MinHashEncoder(n_components=2), + GapEncoder(n_components=2), + ], +) +@skip_if_no_parallel +def test_modying_transformers(low_card_cat_transformer): + """Check that the splitting/merging mecanism doesn't + prevent resetting the transformers""" + + # test that modifying a transformer before refitting works + # https://github.com/skrub-data/skrub/pull/592#discussion_r1284531301 + tb = TableVectorizer(low_card_cat_transformer=low_card_cat_transformer, n_jobs=2) + X = _get_clean_dataframe() + tb.fit_transform(X) + tb.low_card_cat_transformer = "passthrough" + tb.fit_transform(X) + assert tb.low_card_cat_transformer_ == "passthrough" + assert tb.transformers[0][1] == "passthrough" + assert tb.transformers_[0][1] == "passthrough" + assert tb.transform(X).shape == (5, 6) + + # test a failed fit_transform doesn't break the following fit_transform + # https://github.com/skrub-data/skrub/pull/592#discussion_r1278591301 + tb = TableVectorizer( + low_card_cat_transformer=low_card_cat_transformer, + # to make ColumnTransformer fit_transform fail + numerical_transformer="not_applicable", + n_jobs=2, + ) + with pytest.raises(TypeError): + tb.fit_transform(X) + assert len(tb.transformers) == 5 # the transformers should have been splitted + # but not merged + tb.numerical_transformer = "passthrough" + tb.fit_transform(X) + assert len(tb.transformers) == 2 # the transformers should have been splitted + # and merged correctly + assert len(tb.transformers_) == 2 + + def test_table_vectorizer_remainder_cloning(): """Check that remainder is cloned when used.""" df1 = _get_clean_dataframe() diff --git a/skrub/tests/utils.py b/skrub/tests/utils.py index 5572cf4e5..d99c8b6e3 100644 --- a/skrub/tests/utils.py +++ b/skrub/tests/utils.py @@ -27,3 +27,79 @@ def generate_data( else: X = np.array(str_list).reshape(n_samples, 1) return X + + +def is_valid_attribute(attribute): + # check that the type is not too weird + # so we can check equality + valid_types = ( + int, + float, + np.ndarray, + str, + bool, + type(None), + list, + tuple, + dict, + set, + ) + + if isinstance(attribute, (list, tuple, set)): + return all(is_valid_attribute(item) for item in attribute) + elif isinstance(attribute, dict): + return all( + is_valid_attribute(key) and is_valid_attribute(value) + for key, value in attribute.items() + ) + else: + return isinstance(attribute, valid_types) + + +def transformers_equal(transformer1, transformer2): + # Check if the transformers are of the same type + if type(transformer1) != type(transformer2): + return False + + # if string transformers, check if they are the same + if isinstance(transformer1, str): + return transformer1 == transformer2 + + # Compare hyperparameters + if transformer1.get_params() != transformer2.get_params(): + return False + + # Compare fitted attributes + for attribute in transformer1.__dict__: + if attribute.endswith("_"): + if not is_valid_attribute(getattr(transformer1, attribute)): + # check that the type is the same + if not isinstance( + getattr(transformer1, attribute), + type(getattr(transformer2, attribute)), + ): + return False + else: + if not np.array_equal( + getattr(transformer1, attribute), getattr(transformer2, attribute) + ): + return False + + return True + + +def transformers_list_equal(transformers_list1, transformers_list2): + # check equaility for list of 3-tuples (name, transformer, columns) + # used in the TableVectorizer + if len(transformers_list1) != len(transformers_list2): + return False + for (name1, transformer1, columns1), (name2, transformer2, columns2) in zip( + transformers_list1, transformers_list2 + ): + if name1 != name2: + return False + if columns1 != columns2: + return False + if not transformers_equal(transformer1, transformer2): + return False + return True