Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize the TableVectorizer column-wise #592

Merged
merged 62 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
eb0a020
fall back to pandas if no datetime format is found
LeoGrin Jun 10, 2023
f813fbb
change changelog
LeoGrin Jun 10, 2023
5d5e862
Merge branch 'main' of https://github.com/skrub-data/skrub into table…
LeoGrin Jun 12, 2023
d9a374f
first working version
LeoGrin Jun 12, 2023
ffa9755
add tests
LeoGrin Jun 12, 2023
5bcb6dd
update changelog
LeoGrin Jun 12, 2023
c005a49
typo
LeoGrin Jun 12, 2023
85b7f9c
copy transformer when split between columns to avoid conflict
LeoGrin Jun 13, 2023
26e66bd
Merge branch 'main' into table_vec_parallelism
LeoGrin Jun 13, 2023
c3706c7
update changelog
LeoGrin Jun 12, 2023
4147549
fix bug with repeated transformer
LeoGrin Jun 13, 2023
7771eb6
split and merge
LeoGrin Jun 22, 2023
8acca83
more tests
LeoGrin Jun 22, 2023
8dd62cb
merge with main
LeoGrin Jun 22, 2023
b997d28
fix test
LeoGrin Jun 23, 2023
f1ec2dd
also split self.transformers_ to parallelize transform
LeoGrin Jun 23, 2023
198fb04
merging with latest change
LeoGrin Jun 26, 2023
4780dfc
add _split to minhashencoder
LeoGrin Jun 26, 2023
744e881
cleaning
LeoGrin Jun 27, 2023
856892b
clean test
LeoGrin Jun 27, 2023
4cb44a5
merge with main
LeoGrin Jul 17, 2023
e50323a
fix wrong merge
LeoGrin Jul 17, 2023
6d007cf
fix wrong merge
LeoGrin Jul 17, 2023
f2b4266
Merge branch 'main' into table_vec_parallelism
LeoGrin Jul 18, 2023
970a73a
Apply suggestions from code review
LeoGrin Jul 19, 2023
d15e7e5
Update skrub/_utils.py
LeoGrin Jul 19, 2023
4ae9ae2
apply Lilian's suggestions
LeoGrin Jul 19, 2023
b5848e7
add future annotation to avoid circular import in type hints
LeoGrin Jul 19, 2023
6f645fc
add docstring
LeoGrin Jul 19, 2023
9753bd0
Update skrub/_table_vectorizer.py
LeoGrin Jul 20, 2023
6e48fab
type hint
LeoGrin Jul 20, 2023
76ea8a9
Merge branch 'table_vec_parallelism' of https://github.com/LeoGrin/sk…
LeoGrin Jul 20, 2023
63e5afd
revert change
LeoGrin Jul 20, 2023
3b768ea
Merge branch 'main' into table_vec_parallelism
LeoGrin Jul 20, 2023
20682dd
Merge remote-tracking branch 'upstream/main' into table_vec_parallelism
LeoGrin Jul 27, 2023
2fc38b9
Merge branch 'table_vec_parallelism' of https://github.com/LeoGrin/sk…
LeoGrin Jul 27, 2023
5e6966a
first batch of Vincent's suggestions
LeoGrin Aug 4, 2023
da06783
use tags
LeoGrin Aug 4, 2023
3671fb6
Merge branch 'main' into table_vec_parallelism
LeoGrin Aug 4, 2023
f8b177a
compare fitted transformers better
LeoGrin Aug 4, 2023
e1a98ea
Merge branch 'table_vec_parallelism' of https://github.com/LeoGrin/sk…
LeoGrin Aug 4, 2023
02bf26e
talk about tags in changelog
LeoGrin Aug 4, 2023
bb7ad81
run precommit checks
LeoGrin Aug 4, 2023
9dd62bb
get rid of _transformers_original
LeoGrin Aug 8, 2023
d6939f5
clean tests
LeoGrin Aug 18, 2023
9157451
Merge remote-tracking branch 'upstream/main' into table_vec_parallelism
LeoGrin Aug 25, 2023
99b3a9b
Apply suggestions from code review
LeoGrin Aug 26, 2023
769df3a
_parallel_on_columns
LeoGrin Aug 28, 2023
28cfb4c
explain transformers vs transformers_
LeoGrin Aug 29, 2023
e338fe1
Merge branch 'table_vec_parallelism' of https://github.com/LeoGrin/sk…
LeoGrin Aug 29, 2023
9aec058
split merge into two functions
LeoGrin Aug 29, 2023
72dd3d4
add tests to check that splitting doesn't prevent resetting transformers
LeoGrin Aug 29, 2023
2d5fd3a
Apply suggestions from code review
LeoGrin Aug 30, 2023
2eb6f5b
combine merge_unfitted and merge_fitted and move _split outside of class
LeoGrin Aug 30, 2023
7f7b524
don't return empty new_transformer_to_input_indices
LeoGrin Aug 30, 2023
ebcca20
remove future warning
LeoGrin Aug 30, 2023
d8e10ce
remove future warning
LeoGrin Sep 1, 2023
25d854b
add docstrings
LeoGrin Sep 1, 2023
0d381d0
fix merge
LeoGrin Sep 1, 2023
1cef43d
fix test
LeoGrin Sep 1, 2023
eabdb8b
Update skrub/_table_vectorizer.py
LeoGrin Sep 1, 2023
134a413
fix type hints
LeoGrin Sep 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ Major changes
* Parallelized the :class:`GapEncoder` column-wise. Parameters `n_jobs` and `verbose`
added to the signature. :pr:`582` by :user:`Lilian Boulard <LilianBoulard>`


Minor changes
-------------

* When possible, parallelism is done at the column level rather than the transformer level in :class:`TableVectorizer`.
This is the case for transformers with the tag `univariate` (right now :class:`MinHashEncoder` and :class:`GapEncoder`).
LeoGrin marked this conversation as resolved.
Show resolved Hide resolved
:pr:`592` by :user:`Leo Grinsztajn <LeoGrin>`

* Parallelized the :func:`deduplicate` function. Parameter `n_jobs`
added to the signature. :pr:`618` by :user:`Jovan Stojanovic <jovan-stojanovic>`
and :user:`Lilian Boulard <LilianBoulard>`
Expand Down Expand Up @@ -156,6 +164,8 @@ Minor changes
:pr:`543` by :user:`Leo Grinsztajn <LeoGrin>`
:pr:`587` by :user:`Leo Grinsztajn <LeoGrin>`



Dirty-cat Release 0.4.0
=========================

Expand Down
36 changes: 35 additions & 1 deletion skrub/_gap_encoder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Implements the GapEncoder: a probabilistic encoder for categorical variables.
"""
from __future__ import annotations
Vincent-Maladiere marked this conversation as resolved.
Show resolved Hide resolved

from collections.abc import Generator
from copy import deepcopy
Expand All @@ -12,7 +13,7 @@
from numpy.random import RandomState
from numpy.typing import ArrayLike, NDArray
from scipy import sparse
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.cluster import KMeans, kmeans_plusplus
from sklearn.decomposition._nmf import _beta_divergence
from sklearn.feature_extraction.text import CountVectorizer, HashingVectorizer
Expand Down Expand Up @@ -645,6 +646,35 @@ class GapEncoder(TransformerMixin, BaseEstimator):
fitted_models_: list[GapEncoderColumn]
column_names_: list[str]

@classmethod
def _merge(cls, transformers_list: list[GapEncoder]):
# merge GapEncoder fitted on different columns
# into a single GapEncoder
# useful for parallelization in the TableVectorizer
full_transformer = clone(transformers_list[0])
# assert rho_ is the same for all transformers
rho_ = transformers_list[0].rho_
full_transformer.rho_ = rho_
full_transformer.fitted_models_ = []
for transformers in transformers_list:
full_transformer.fitted_models_.extend(transformers.fitted_models_)
if hasattr(transformers_list[0], "column_names_"):
full_transformer.column_names_ = []
for transformers in transformers_list:
full_transformer.column_names_.extend(transformers.column_names_)
return full_transformer

def _split(self):
check_is_fitted(self)
transformers_list = []
for i, model in enumerate(self.fitted_models_):
transformer = clone(self)
transformer.rho_ = model.rho_
transformer.fitted_models_ = [model]
transformer.column_names_ = [self.column_names_[i]]
transformers_list.append(transformer)
return transformers_list

def __init__(
self,
*,
Expand Down Expand Up @@ -932,6 +962,10 @@ def _more_tags(self):
),
"check_estimators_dtypes": "We only support string dtypes.",
},
"univariate": True, # whether the estimator is univariate and can be
# applied column by column. This is useful for the TableVectorizer,
# to decide whether to apply the transformer on each column separately
# and thus improve the parallelization when the transformer is slow enough.
}


Expand Down
42 changes: 40 additions & 2 deletions skrub/_minhash_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@
Implements the MinHashEncoder, which encodes string categorical features by
applying the MinHash method to n-gram decompositions of strings.
"""
from __future__ import annotations
Vincent-Maladiere marked this conversation as resolved.
Show resolved Hide resolved

from collections.abc import Callable, Collection
from typing import Literal

import numpy as np
from joblib import Parallel, delayed, effective_n_jobs
from numpy.typing import ArrayLike, NDArray
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.utils import gen_even_slices, murmurhash3_32
from sklearn.utils.validation import _check_feature_names_in, check_is_fitted

from ._fast_hash import ngram_min_hash
from ._string_distances import get_unique_ngrams
from ._utils import LRUDict, check_input
from ._utils import LRUDict, check_input, combine_lru_dicts

NoneType = type(None)

Expand Down Expand Up @@ -120,6 +121,39 @@ class MinHashEncoder(TransformerMixin, BaseEstimator):

_capacity: int = 2**10

@classmethod
def _merge(cls, transformers_list: list[MinHashEncoder]):
LeoGrin marked this conversation as resolved.
Show resolved Hide resolved
# merge MinHashEncoder fitted on different columns
# into a single MinHashEncoder
# useful for parallelization in the TableVectorizer
full_transformer = clone(transformers_list[0])
capacity = transformers_list[0]._capacity
full_transformer.hash_dict_ = combine_lru_dicts(
capacity, *[transformer.hash_dict_ for transformer in transformers_list]
)
full_transformer.n_features_in_ = sum(
transformer.n_features_in_ for transformer in transformers_list
)
full_transformer.feature_names_in_ = np.concatenate(
[transformer.feature_names_in_ for transformer in transformers_list]
)
return full_transformer

def _split(self):
check_is_fitted(self)
transformer_list = []
for i in range(self.n_features_in_):
trans = clone(self)
attributes = ["hash_dict_", "_capacity"]
for a in attributes:
if hasattr(self, a):
setattr(trans, a, getattr(self, a))
# TODO; do we want to deepcopy hash_dict_
trans.n_features_in_ = 1
trans.feature_names_in_ = np.array([self.feature_names_in_[i]])
transformer_list.append(trans)
return transformer_list

def __init__(
self,
*,
Expand Down Expand Up @@ -395,4 +429,8 @@ def _more_tags(self):
),
"check_estimators_dtypes": "We only support string dtypes.",
},
"univariate": True, # whether the estimator is univariate and can be
# applied column by column. This is useful for the TableVectorizer,
# to decide whether to apply the transformer on each column separately
# and thus improve the parallelization when the transformer is slow enough.
}
171 changes: 170 additions & 1 deletion skrub/_table_vectorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,76 @@ def _replace_missing_in_cat_col(ser: pd.Series, value: str = "missing") -> pd.Se
return ser


def _parallel_on_columns(trans: TransformerMixin, cols: list[str]):
LeoGrin marked this conversation as resolved.
Show resolved Hide resolved
"""
Assert whether we want to parallelize the transformer over
the columns or not. We only want to parallelize if the transformer
LeoGrin marked this conversation as resolved.
Show resolved Hide resolved
is "univariate" (i.e. it can be duplicated for each column).
"""
return (
(not isinstance(trans, str))
and trans._get_tags().get("univariate", False)
and len(cols) > 1
)

LeoGrin marked this conversation as resolved.
Show resolved Hide resolved

def _merge_unfitted_transformers(
transformers: list[tuple[str, str | TransformerMixin, list[str]]]
):
LeoGrin marked this conversation as resolved.
Show resolved Hide resolved
new_transformers = []
base_names = pd.unique([name.split("_split_")[0] for name, _, _ in transformers])
for base_name in base_names:
for name, trans, _ in transformers:
if name.startswith(base_name):
new_trans = trans if isinstance(trans, str) else clone(trans)
break
new_transformers.append(
(
base_name,
new_trans,
[
col
for name, _, cols in transformers
for col in cols
if name.startswith(base_name)
],
)
)
return new_transformers


def _merge_fitted_transformers(
transformers_: list[tuple[str, str | TransformerMixin, list[str]]],
_transformer_to_input_indices: dict[str, list[int]],
):
new_transformers_ = []
new_transformer_to_input_indices = {}
base_names = pd.unique([name.split("_split_")[0] for name, _, _ in transformers_])
for base_name in base_names:
# merge all transformers with the same base name
transformers, columns, names = [], [], []
for name, trans, cols in transformers_:
if name.startswith(base_name):
columns.extend(cols)
transformers.append(trans)
names.append(name)
if len(transformers) == 1:
new_transformers_.append((base_name, transformers[0], columns))
new_transformer_to_input_indices[base_name] = list(
_transformer_to_input_indices[base_name]
)
else:
# merge transformers
new_transformers_.append(
(base_name, transformers[0].__class__._merge(transformers), columns)
)
new_transformer_to_input_indices[base_name] = list(
np.concatenate([_transformer_to_input_indices[name] for name in names])
)

return new_transformers_, new_transformer_to_input_indices


OptionalTransformer = (
TransformerMixin | Literal["drop", "remainder", "passthrough"] | None
)
Expand Down Expand Up @@ -352,6 +422,8 @@ class TableVectorizer(ColumnTransformer):
types_: dict[str, type]
imputed_columns_: list[str]

_transformer_to_input_indices: dict[str, list[int]]

# Override required parameters
_required_parameters = []

Expand Down Expand Up @@ -405,6 +477,13 @@ def _more_tags(self):
},
}

@property
def is_parallelized(self) -> bool:
"""
Returns True if the transformers are parallelized over columns, False otherwise.
"""
return self.n_jobs not in (None, 1)

def _clone_transformers(self):
"""
For each of the different transformers that can be passed,
Expand Down Expand Up @@ -474,6 +553,78 @@ def _clone_transformers(self):

# TODO: check that the provided transformers are valid

def _split_univariate_transformers(self, during_fit: bool = False):
"""
Split univariate transformers into multiple transformers, one for each
column. This is useful to use the inherited `ColumnTransformer` class
parallelism.

Parameters
----------
during_fit : bool, default=False
Whether the method is called during `fit_transform` (True) or
during `transform` (False). This is used to determine whether
to split the self.transformers_ attribute (when False) or the
self.transformers attribute (when True).
"""
if during_fit:
LeoGrin marked this conversation as resolved.
Show resolved Hide resolved
# split self.transformers, a list of 3-tuples (name, transformer, columns)
# containing the unfitted transformers (or strings) and the columns
# to be fitted on. This attribute is used by the `ColumnTransformer`
# when calling `fit` and `fit_transform`.
new_transformers = []
for name, trans, cols in self.transformers:
if _parallel_on_columns(trans, cols):
for i, col in enumerate(cols):
new_transformers.append(
(f"{name}_split_{i}", clone(trans), [col])
)
else:
new_transformers.append((name, trans, cols))
self.transformers = new_transformers
LeoGrin marked this conversation as resolved.
Show resolved Hide resolved
else:
# split self.transformers_, a list of 3-tuples (name, transformer, columns)
# containing the fitted transformers (or strings) and the columns
# they were fitted on. This attribute is used by the `ColumnTransformer`
# when calling `transform`.
check_is_fitted(self, attributes=["transformers_"])
self._transformers_fitted_original = self.transformers_
new_transformers_ = []
new_transformer_to_input_indices = {}
for name, trans, cols in self.transformers_:
if _parallel_on_columns(trans, cols):
splitted_transformers_ = trans._split()
for i, (col, trans, trans_to_mapping) in enumerate(
zip(
cols,
splitted_transformers_,
self._transformer_to_input_indices[name],
)
):
name_split = f"{name}_split_{i}"
new_transformers_.append((name_split, trans, [col]))
new_transformer_to_input_indices[name_split] = [
trans_to_mapping
]
else:
new_transformers_.append((name, trans, cols))
new_transformer_to_input_indices[
name
] = self._transformer_to_input_indices[name]
self.transformers_ = new_transformers_
self._transformer_to_input_indices = new_transformer_to_input_indices

def _merge_univariate_transformers(self):
# merge back self.transformers and self.transformers_
check_is_fitted(self, attributes=["transformers_"])
self.transformers = _merge_unfitted_transformers(self.transformers)
(
self.transformers_,
self._transformer_to_input_indices,
) = _merge_fitted_transformers(
self.transformers_, self._transformer_to_input_indices
)

def _auto_cast(self, X: pd.DataFrame) -> pd.DataFrame:
"""Takes a dataframe and tries to convert its columns to their best possible data type.

Expand Down Expand Up @@ -777,6 +928,11 @@ def fit_transform(self, X: ArrayLike, y: ArrayLike = None) -> ArrayLike:
if self.verbose:
print(f"[TableVectorizer] Assigned transformers: {self.transformers}")

# split the univariate transformers on each column
# to be able to parallelize the encoding
if self.is_parallelized:
self._split_univariate_transformers(during_fit=True)

X_enc = super().fit_transform(X, y)

# For the "remainder" columns, the `ColumnTransformer` `transformers_`
Expand All @@ -789,6 +945,9 @@ def fit_transform(self, X: ArrayLike, y: ArrayLike = None) -> ArrayLike:
cols: list[int]
self.transformers_[i] = (name, enc, [self.columns_[j] for j in cols])

if self.is_parallelized:
self._merge_univariate_transformers()

return X_enc

def transform(self, X: ArrayLike) -> ArrayLike:
Expand Down Expand Up @@ -817,7 +976,17 @@ def transform(self, X: ArrayLike) -> ArrayLike:
if self.auto_cast:
X = self._apply_cast(X)

return super().transform(X)
# split the univariate transformers on each column
# to be able to parallelize the encoding
if self.is_parallelized:
self._split_univariate_transformers(during_fit=False)

res = super().transform(X)

if self.is_parallelized:
self._merge_univariate_transformers()

return res

def get_feature_names_out(self, input_features=None) -> list[str]:
"""Return clean feature names.
Expand Down
8 changes: 8 additions & 0 deletions skrub/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ def __contains__(self, key: Hashable):
return key in self.cache


def combine_lru_dicts(capacity: int, *lru_dicts: LRUDict) -> LRUDict:
combined_lru_dict = LRUDict(capacity)
for lru_dict in lru_dicts:
for key, value in lru_dict.cache.items():
combined_lru_dict[key] = value
return combined_lru_dict


def check_input(X) -> NDArray:
"""
Check input with sklearn standards.
Expand Down
Loading
Loading