-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodel_training.py
207 lines (181 loc) · 7.33 KB
/
model_training.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import argparse
import logging
import sys
import warnings
import mlflow
import pandas as pd
import xgboost as xgb
from loguru import logger
from config.core import PROJECT_ROOT, config
from utils.logging import log_sklearn_model, log_xgboost_model
from utils.metrics import (plot_pr_curve, plot_proba_distribution,
plot_reliability_curve, plot_roc_curve)
from utils.processing import load_versioned_data
from utils.runs import get_last_run, get_run_by_id
warnings.filterwarnings("ignore")
logging.getLogger("mlflow").setLevel(logging.ERROR)
logger.remove()
logger.add(
sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-run-id", default="", type=str)
parser.add_argument("--tuning-run-id", default="", type=str)
parser.add_argument("--data-version", default="", type=str)
cmd_args = parser.parse_args()
DATA_RUN_ID = cmd_args.data_run_id
DVC_REVISION = cmd_args.data_version
PARAMS_RUN_ID = cmd_args.tuning_run_id
logger.info("Model training started")
mlflow.set_tracking_uri(config.project.tracking_uri)
mlflow.xgboost.autolog(
importance_types=config.model.importance_types,
log_datasets=False, # will be logged manually for better control
log_models=False # will be logged manually for better control
)
mlflow.sklearn.autolog(
log_datasets=False,
log_models=False,
serialization_format=mlflow.sklearn.SERIALIZATION_FORMAT_CLOUDPICKLE
)
# for more: https://mlflow.org/docs/latest/python_api/mlflow.xgboost.html#mlflow.xgboost.autolog # noqa
with mlflow.start_run(log_system_metrics=True) as run:
experiment_id = run.info.experiment_id
run_id = run.info.run_id
logger.info(f"Starting MLflow run: {run_id}")
if not DATA_RUN_ID:
# get last finished run for data preprocessing
if not DVC_REVISION:
data_run = get_last_run(
experiment_id, "Data_Preprocessing", logger)
else:
# filter by dataset_version tag
data_run = get_last_run(
experiment_id, "Data_Preprocessing", logger,
dataset_version=DVC_REVISION)
else:
# get data preprocessing run with specified run id
data_run = get_run_by_id(
experiment_id, DATA_RUN_ID, logger)
# download train and test data
train = load_versioned_data(
run_id=data_run["run_id"],
dataset_name=config.project.train_dataset_name,
logger=logger,
log_usage=True,
targets=config.model.target_name,
context="training"
)
test = load_versioned_data(
run_id=data_run["run_id"],
dataset_name=config.project.test_dataset_name,
logger=logger,
log_usage=True,
targets=config.model.target_name,
context="testing"
)
# convert to DMatrix format
features = [i for i in train.columns if i != config.model.target_name]
dtrain = xgb.DMatrix(
data=train[features],
label=train[config.model.target_name]
)
dtest = xgb.DMatrix(
data=test[features],
label=test[config.model.target_name]
)
if not PARAMS_RUN_ID:
# get last finished parent run for hyperparameters tuning
tuning_run = get_last_run(
experiment_id, "Hyperparameters_Search", logger)
else:
# get hyperparameters tuning run with specified run id
tuning_run = get_run_by_id(
experiment_id, PARAMS_RUN_ID, logger)
# get best params
params = {
col.split(".")[1]: tuning_run[col]
for col in tuning_run.index if (
("params" in col) and (all(
p not in col
for p in ["n-trials", "data-run-id", "data-version"]
))
)
}
params.update(eval_metric=config.model.params_eval_metrics)
mlflow.log_params(params)
model = xgb.train(
params,
dtrain,
num_boost_round=int(params["num_boost_round"]),
evals=[(dtest, "test")],
verbose_eval=False,
early_stopping_rounds=max(
int(int(params["num_boost_round"]) * config.model.early_stopping_heuristic), # noqa
1
)
)
logger.info("Best iteration test_{}: {}".format(
config.model.params_tuning_metric, model.best_score))
local_models_path = PROJECT_ROOT / config.model.save_dir
local_models_path.mkdir(exist_ok=True, parents=True)
# log and register model
input_example = test.loc[0:10, features]
predictions_example = pd.DataFrame(
model.predict(xgb.DMatrix(input_example)),
columns=["predictions"]
)
log_xgboost_model(
model,
artifact_path="booster",
input_example=input_example,
prediction_example=predictions_example.to_json(
orient="split", index=False),
model_name=config.model.name,
model_alias=config.model.champion_alias,
mlflow_save_format=config.model.mlflow_save_format,
local_save_format=config.model.local_save_format,
local_models_path=local_models_path
)
# log additional metrics and plots
test_predicitions = model.predict(xgb.DMatrix(test[features]))
roc_curve_fig = plot_roc_curve(
test[config.model.target_name].values,
test_predicitions
)
mlflow.log_figure(roc_curve_fig, "test_roc_curve.png")
pr_curve_fig = plot_pr_curve(
test[config.model.target_name].values,
test_predicitions
)
mlflow.log_figure(pr_curve_fig, "test_pr_curve.png")
calibr_curve_fig = plot_reliability_curve(
test[config.model.target_name].values,
test_predicitions,
n_bins=5
)
if calibr_curve_fig is not None:
mlflow.log_figure(calibr_curve_fig, "test_calibration_curve.png")
probs_dist_fig = plot_proba_distribution(test_predicitions)
mlflow.log_figure(probs_dist_fig, "test_probability_distribution.png")
# TODO: add logging custom artifacts
# log and register model as sklearn compatible classifier
params.update(num_boost_round=model.best_iteration)
skl_model = xgb.XGBClassifier(**params)
skl_model.fit(train[features], train[config.model.target_name])
predictions_example = pd.DataFrame(
skl_model.predict_proba(input_example)[:, 1],
columns=["predictions"]
)
log_sklearn_model(
skl_model,
artifact_path="sklearn",
input_example=input_example,
prediction_example=predictions_example.to_json(
orient="split", index=False),
model_name=config.model.name,
model_alias=config.model.champion_alias,
local_models_path=local_models_path,
model_name_suffix="_sklearn"
)
logger.success("Model training finished")