Skip to content

Commit

Permalink
Use train model horizons specified in prediction job correctly
Browse files Browse the repository at this point in the history
Signed-off-by: Martijn Cazemier <[email protected]>
  • Loading branch information
MartijnCa committed Dec 7, 2023
1 parent d3d4682 commit c1cc311
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
18 changes: 10 additions & 8 deletions openstef/pipeline/optimize_hyperparameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: MPL-2.0
import os
from typing import Any
from typing import Any, Union

import optuna
import pandas as pd
Expand All @@ -22,7 +22,7 @@
from openstef.model.regressors.regressor import OpenstfRegressor
from openstef.model.serializer import MLflowSerializer
from openstef.pipeline.train_model import (
DEFAULT_TRAIN_HORIZONS,
DEFAULT_TRAIN_HORIZONS_HOURS,
train_model_pipeline_core,
)
from openstef.validation import validation
Expand All @@ -43,7 +43,6 @@ def optimize_hyperparameters_pipeline(
input_data: pd.DataFrame,
mlflow_tracking_uri: str,
artifact_folder: str,
horizons: list[float] = DEFAULT_TRAIN_HORIZONS,
n_trials: int = N_TRIALS,
) -> dict:
"""Optimize hyperparameters pipeline.
Expand All @@ -65,6 +64,12 @@ def optimize_hyperparameters_pipeline(
Optimized hyperparameters.
"""
if pj.train_horizons_minutes is None:
horizons = DEFAULT_TRAIN_HORIZONS_HOURS
else:
horizons = [
horizon_minutes / 60 for horizon_minutes in pj.train_horizons_minutes
]
(
best_model,
model_specs,
Expand Down Expand Up @@ -97,7 +102,7 @@ def optimize_hyperparameters_pipeline(
def optimize_hyperparameters_pipeline_core(
pj: PredictionJobDataClass,
input_data: pd.DataFrame,
horizons: list[float] = DEFAULT_TRAIN_HORIZONS,
horizons: Union[list[float], str] = DEFAULT_TRAIN_HORIZONS_HOURS,
n_trials: int = N_TRIALS,
) -> tuple[
OpenstfRegressor, ModelSpecificationDataClass, Report, dict, int, dict[str, Any]
Expand All @@ -109,7 +114,7 @@ def optimize_hyperparameters_pipeline_core(
Args:
pj: Prediction job
input_data: Raw training input data
horizons: horizons for feature engineering.
horizons: horizons for feature engineering in hours.
n_trials: The number of trials. Defaults to N_TRIALS.
Raises:
Expand All @@ -124,9 +129,6 @@ def optimize_hyperparameters_pipeline_core(
- Optimized hyperparameters.
"""
if pj.train_horizons_minutes is not None:
horizons = pj.train_horizons_minutes

if input_data.empty:
raise InputDataInsufficientError("Input dataframe is empty")
elif "load" not in input_data.columns:
Expand Down
23 changes: 12 additions & 11 deletions openstef/pipeline/train_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# SPDX-License-Identifier: MPL-2.0
import logging
import os
from typing import Optional, Union, Tuple
from typing import Optional, Union

import pandas as pd
import structlog
Expand All @@ -25,7 +25,7 @@
from openstef.model_selection.model_selection import split_data_train_validation_test
from openstef.validation import validation

DEFAULT_TRAIN_HORIZONS: list[float] = [0.25, 47.0]
DEFAULT_TRAIN_HORIZONS_HOURS: list[float] = [0.25, 47.0]
MAXIMUM_MODEL_AGE: int = 7

DEFAULT_EARLY_STOPPING_ROUNDS: int = 10
Expand Down Expand Up @@ -80,12 +80,19 @@ def train_model_pipeline(

# Train model with core pipeline
try:
if pj.train_horizons_minutes is None:
horizons = DEFAULT_TRAIN_HORIZONS_HOURS
else:
horizons = [
horizon_minutes / 60 for horizon_minutes in pj.train_horizons_minutes
]

model, report, model_specs_updated, data_sets = train_model_pipeline_core(
pj,
model_specs,
input_data,
old_model,
horizons=pj.train_horizons_minutes,
horizons=horizons,
)
except OldModelHigherScoreError as OMHSE:
logger.error("Old model is better than new model", pid=pj["id"], exc_info=OMHSE)
Expand Down Expand Up @@ -134,7 +141,7 @@ def train_model_pipeline_core(
model_specs: ModelSpecificationDataClass,
input_data: pd.DataFrame,
old_model: OpenstfRegressor = None,
horizons: Union[list[float], str] = None,
horizons: Union[list[float], str] = DEFAULT_TRAIN_HORIZONS_HOURS,
) -> Union[
OpenstfRegressor,
Report,
Expand All @@ -151,7 +158,7 @@ def train_model_pipeline_core(
model_specs: Dataclass containing model specifications
input_data: Input data
old_model: Old model to compare to. Defaults to None.
horizons: horizons to train on in hours.
horizons: Horizons to train on in hours, relevant for feature engineering.
Raises:
InputDataInsufficientError: when input data is insufficient.
Expand All @@ -165,12 +172,6 @@ def train_model_pipeline_core(
- Datasets (tuple[pd.DataFrmae, pd.DataFrame, pd.Dataframe): The train, validation and test sets
"""
if horizons is None:
if pj.train_horizons_minutes is None:
horizons = DEFAULT_TRAIN_HORIZONS
else:
horizons = pj.train_horizons_minutes

logger = structlog.get_logger(__name__)

# Call common pipeline
Expand Down

0 comments on commit c1cc311

Please sign in to comment.