Skip to content

Commit

Permalink
Merge pull request #332 from ZmeiGorynych/ray
Browse files Browse the repository at this point in the history
Add support for Ray in running trials, fix Pandas warning
  • Loading branch information
EgorKraevTransferwise authored Dec 18, 2024
2 parents 2302262 + 185dcc8 commit 857bb02
Show file tree
Hide file tree
Showing 10 changed files with 456 additions and 246 deletions.
2 changes: 1 addition & 1 deletion causaltune/models/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ElasticNetEstimator(SKLearnEstimator):
ITER_HP = "max_iter"

@classmethod
def search_space(cls, data_size, task="regresssion", **params):
def search_space(cls, data_size, task="regression", **params):
return {
"alpha": {
"domain": tune.loguniform(lower=0.0001, upper=1.0),
Expand Down
126 changes: 47 additions & 79 deletions causaltune/optimiser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

from econml.inference import BootstrapInference

from joblib import Parallel, delayed

from causaltune.search.params import SimpleParamService
from causaltune.score.scoring import Scorer, metrics_to_minimize
from causaltune.utils import treatment_is_multivalue
Expand All @@ -34,13 +32,9 @@
from causaltune.dataset_processor import CausalityDatasetProcessor
from causaltune.models.passthrough import feature_filter

# tune.run = run


# Patched from sklearn.linear_model._base to adjust rtol and atol values
def _check_precomputed_gram_matrix(
X, precompute, X_offset, X_scale, rtol=1e-4, atol=1e-2
):
def _check_precomputed_gram_matrix(X, precompute, X_offset, X_scale, rtol=1e-4, atol=1e-2):
n_features = X.shape[1]
f1 = n_features // 2
f2 = min(f1 + 1, n_features - 1)
Expand Down Expand Up @@ -177,24 +171,17 @@ def __init__(
self._settings["tuner"]["time_budget_s"] = time_budget
self._settings["tuner"]["num_samples"] = num_samples
self._settings["tuner"]["verbose"] = verbose
self._settings["tuner"][
"use_ray"
] = use_ray # requires ray to be installed via pip install flaml[ray]
self._settings["tuner"]["resources_per_trial"] = (
resources_per_trial if resources_per_trial is not None else {"cpu": 0.5}
)
self._settings["try_init_configs"] = try_init_configs
self._settings["include_experimental_estimators"] = (
include_experimental_estimators
)
self._settings["include_experimental_estimators"] = include_experimental_estimators

# params for FLAML on component models:
self._settings["component_models"] = {}
self._settings["component_models"]["task"] = components_task
self._settings["component_models"]["verbose"] = components_verbose
self._settings["component_models"][
"pred_time_limit"
] = components_pred_time_limit
self._settings["component_models"]["pred_time_limit"] = components_pred_time_limit
self._settings["component_models"]["n_jobs"] = components_njobs
self._settings["component_models"]["time_budget"] = components_time_budget
self._settings["component_models"]["eval_method"] = "holdout"
Expand All @@ -221,6 +208,7 @@ def __init__(
self.causal_model = None
self.identified_estimand = None
self.problem = None
self.use_ray = use_ray
# properties that are used to resume fits (warm start)
self.resume_scores = []
self.resume_cfg = []
Expand All @@ -239,9 +227,7 @@ def init_propensity_model(self, propensity_model: str):
self.propensity_model = AutoML(
**{**self._settings["component_models"], "task": "classification"}
)
elif hasattr(propensity_model, "fit") and hasattr(
propensity_model, "predict_proba"
):
elif hasattr(propensity_model, "fit") and hasattr(propensity_model, "predict_proba"):
self.propensity_model = propensity_model
else:
raise ValueError(
Expand All @@ -266,9 +252,7 @@ def init_outcome_model(self, outcome_model):
# The current default behavior
return self.auto_outcome_model()
else:
raise ValueError(
'outcome_model valid values are None, "auto", or an estimator object'
)
raise ValueError('outcome_model valid values are None, "auto", or an estimator object')

def auto_outcome_model(self):
data = self.data
Expand Down Expand Up @@ -303,6 +287,7 @@ def fit(
preprocess: bool = False,
encoder_type: Optional[str] = None,
encoder_outcome: Optional[str] = None,
use_ray: Optional[bool] = None,
):
"""Performs AutoML on list of causal inference estimators
- If estimator has a search space specified in its parameters, HPO is performed on the whole model.
Expand All @@ -326,6 +311,9 @@ def fit(
Returns:
None
"""
if use_ray is not None:
self.use_ray = use_ray

if outcome is None and isinstance(data, CausalityDataset):
outcome = data.outcomes[0]

Expand All @@ -344,19 +332,15 @@ def fit(
if preprocess:
data = copy.deepcopy(data)
self.dataset_processor = CausalityDatasetProcessor()
self.dataset_processor.fit(
data, encoder_type=encoder_type, outcome=encoder_outcome
)
self.dataset_processor.fit(data, encoder_type=encoder_type, outcome=encoder_outcome)
data = self.dataset_processor.transform(data)
else:
self.dataset_processor = None

self.data = data
treatment_values = data.treatment_values

assert (
len(treatment_values) > 1
), "Treatment must take at least 2 values, eg 0 and 1!"
assert len(treatment_values) > 1, "Treatment must take at least 2 values, eg 0 and 1!"

self._control_value = treatment_values[0]
self._treatment_values = list(treatment_values[1:])
Expand All @@ -378,8 +362,8 @@ def fit(

self.init_propensity_model(self._settings["propensity_model"])

self.identified_estimand: IdentifiedEstimand = (
self.causal_model.identify_effect(proceed_when_unidentifiable=True)
self.identified_estimand: IdentifiedEstimand = self.causal_model.identify_effect(
proceed_when_unidentifiable=True
)

if bool(self.identified_estimand.estimands["iv"]) and bool(data.instruments):
Expand Down Expand Up @@ -450,9 +434,7 @@ def fit(
and self._settings["tuner"]["num_samples"] == -1
):
self._settings["tuner"]["time_budget_s"] = (
2.5
* len(self.estimator_list)
* self._settings["component_models"]["time_budget"]
2.5 * len(self.estimator_list) * self._settings["component_models"]["time_budget"]
)

cmtb = self._settings["component_models"]["time_budget"]
Expand Down Expand Up @@ -485,9 +467,7 @@ def fit(
# )
# )

search_space = self.cfg.search_space(
self.estimator_list, data_size=data.data.shape
)
search_space = self.cfg.search_space(self.estimator_list, data_size=data.data.shape)
init_cfg = (
self.cfg.default_configs(self.estimator_list, data_size=data.data.shape)
if self._settings["try_init_configs"]
Expand All @@ -507,14 +487,12 @@ def fit(
self._tune_with_config,
search_space,
metric=self.metric,
# use_ray=self.use_ray,
cost_attr="evaluation_cost",
points_to_evaluate=(
init_cfg if len(self.resume_cfg) == 0 else self.resume_cfg
),
evaluated_rewards=(
[] if len(self.resume_scores) == 0 else self.resume_scores
),
points_to_evaluate=(init_cfg if len(self.resume_cfg) == 0 else self.resume_cfg),
evaluated_rewards=([] if len(self.resume_scores) == 0 else self.resume_scores),
mode=("min" if self.metric in metrics_to_minimize() else "max"),
# resources_per_trial= {"cpu": 1} if self.use_ray else None,
low_cost_partial_config={},
**self._settings["tuner"],
)
Expand All @@ -529,12 +507,8 @@ def fit(
self._tune_with_config,
search_space,
metric=self.metric,
points_to_evaluate=(
init_cfg if len(self.resume_cfg) == 0 else self.resume_cfg
),
evaluated_rewards=(
[] if len(self.resume_scores) == 0 else self.resume_scores
),
points_to_evaluate=(init_cfg if len(self.resume_cfg) == 0 else self.resume_cfg),
evaluated_rewards=([] if len(self.resume_scores) == 0 else self.resume_scores),
mode=("min" if self.metric in metrics_to_minimize() else "max"),
low_cost_partial_config={},
**self._settings["tuner"],
Expand Down Expand Up @@ -568,18 +542,25 @@ def _tune_with_config(self, config: dict) -> dict:
Returns:
(dict): values of metrics after optimisation
"""
estimates = Parallel(n_jobs=2, backend="threading")(
delayed(self._estimate_effect)(config) for i in range(1)
)[0]
from causaltune.remote import remote_exec

if self.use_ray:
# flaml.tune handles the interaction with Ray itself
# estimates = self._estimate_effect(config)
estimates = remote_exec(CausalTune._estimate_effect, (self, config), self.use_ray)
else:
estimates = remote_exec(CausalTune._estimate_effect, (self, config), self.use_ray)

# Parallel(n_jobs=2, backend="threading")(
# delayed(self._estimate_effect)(config) for i in range(1)
# ))[0]

if "exception" not in estimates:
est_name = estimates["estimator_name"]
current_score = estimates[self.metric]

estimates["optimization_score"] = current_score
estimates["evaluation_cost"] = (
1e8 # will be overwritten for successful runs
)
estimates["evaluation_cost"] = 1e8 # will be overwritten for successful runs

# Initialize best_score if this is the first estimator for this name
if est_name not in self._best_estimators:
Expand Down Expand Up @@ -611,22 +592,19 @@ def _tune_with_config(self, config: dict) -> dict:
"codec",
"policy_risk",
]:
is_better = (
np.isfinite(current_score) and current_score < best_score
) or (np.isinf(best_score) and np.isfinite(current_score))
is_better = (np.isfinite(current_score) and current_score < best_score) or (
np.isinf(best_score) and np.isfinite(current_score)
)
else:
is_better = (
np.isfinite(current_score) and current_score > best_score
) or (np.isinf(best_score) and np.isfinite(current_score))
is_better = (np.isfinite(current_score) and current_score > best_score) or (
np.isinf(best_score) and np.isfinite(current_score)
)

# Store the estimator if we're storing all, if it's better, or if it's the first valid (non-inf) estimator
if (
self._settings["store_all"]
or is_better
or (
self._best_estimators[est_name][1] is None
and np.isfinite(current_score)
)
or (self._best_estimators[est_name][1] is None and np.isfinite(current_score))
):
self._best_estimators[est_name] = (
current_score,
Expand Down Expand Up @@ -658,9 +636,7 @@ def _estimate_effect(self, config):
# Do we need an boject property for this, instead of a local var?
self.estimator_name = config["estimator"]["estimator_name"]
outcome_model = self.init_outcome_model(self._settings["outcome_model"])
method_params = self.cfg.method_params(
config, outcome_model, self.propensity_model
)
method_params = self.cfg.method_params(config, outcome_model, self.propensity_model)

try: #
# This calls the causal model's estimate_effect method
Expand Down Expand Up @@ -697,9 +673,7 @@ def _estimate_effect(self, config):
}

def _compute_metrics(self, estimator, df: pd.DataFrame) -> dict:
return self.scorer.make_scores(
estimator, df, self.metrics_to_report, r_scorer=None
)
return self.scorer.make_scores(estimator, df, self.metrics_to_report, r_scorer=None)

def score_dataset(self, df: pd.DataFrame, dataset_name: str):
"""
Expand All @@ -714,13 +688,9 @@ def score_dataset(self, df: pd.DataFrame, dataset_name: str):
"""
for scr in self.scores.values():
if scr["estimator"] is None:
warnings.warn(
"Skipping scoring for estimator %s", scr["estimator_name"]
)
warnings.warn("Skipping scoring for estimator %s", scr["estimator_name"])
else:
scr["scores"][dataset_name] = self._compute_metrics(
scr["estimator"], df
)
scr["scores"][dataset_name] = self._compute_metrics(scr["estimator"], df)

@property
def best_estimator(self) -> str:
Expand Down Expand Up @@ -793,9 +763,7 @@ def effect(self, df, *args, **kwargs):
"""
return self.model.effect(df, *args, **kwargs)

def predict(
self, cd: CausalityDataset, preprocess: Optional[bool] = False, *args, **kwargs
):
def predict(self, cd: CausalityDataset, preprocess: Optional[bool] = False, *args, **kwargs):
"""Heterogeneous Treatment Effects for data CausalityDataset
Args:
Expand Down
12 changes: 12 additions & 0 deletions causaltune/remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
def remote_exec(function, args, use_ray=False):
if use_ray:
import ray

remote_function = ray.remote(function)
return ray.get(remote_function.remote(*args))
else:
from joblib import Parallel, delayed

return Parallel(n_jobs=2, backend="threading")(delayed(function)(*args) for i in range(1))[
0
]
Loading

0 comments on commit 857bb02

Please sign in to comment.