Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/upgrade customer satisfaction #84

Merged
merged 13 commits into from
Dec 22, 2023
2 changes: 1 addition & 1 deletion .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ connexion = "connexion"
[default.extend-words]
# aks = "aks"
GOES = "GOES"

lenght = "lenght"

[default]
locale = "en-us"
187 changes: 138 additions & 49 deletions customer-satisfaction/README.md
AlexejPenner marked this conversation as resolved.
Show resolved Hide resolved

Large diffs are not rendered by default.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added customer-satisfaction/_assets/StreamlitApp.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added customer-satisfaction/_assets/mlflow_stack.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 8 additions & 7 deletions customer-satisfaction/config.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
enable_cache: False
extra:
tags: zenml-projects

settings:
docker:
required_integrations:
- mlflow
steps:
model_train:
experiment_tracker: mlflow_tracker
evaluation:
experiment_tracker: mlflow_tracker

# configuration of the Model Control Plane
model_version:
name: Customer_Satisfaction_Predictor
license: Apache 2.0
description: Predictor of Customer Satisfaction.
tags: ["classification", "customer_satisfaction"]
80 changes: 0 additions & 80 deletions customer-satisfaction/materializer/custom_materializer.py

This file was deleted.

3 changes: 3 additions & 0 deletions customer-satisfaction/model/data_cleaning.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def preprocess_data(self) -> pd.DataFrame:
]
self.df = self.df.drop(cols_to_drop, axis=1)

# Catchall fillna in case any where missed
self.df.fillna(self.df.mean(), inplace=True)

return self.df
except Exception as e:
logging.error(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from sklearn.metrics import mean_squared_error, r2_score


class Evaluation:
class Evaluator:
AlexejPenner marked this conversation as resolved.
Show resolved Hide resolved
"""
Evaluation class which evaluates the model performance using the sklearn metrics.
"""
Expand Down
15 changes: 8 additions & 7 deletions customer-satisfaction/model/model_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pandas as pd
import xgboost as xgb
from lightgbm import LGBMRegressor
from sklearn.base import RegressorMixin
from sklearn.ensemble import RandomForestRegressor


Expand Down Expand Up @@ -79,7 +80,7 @@ def optimize_xgboost_regressor(self, trial: optuna.Trial) -> float:
return val_accuracy


class ModelTraining:
class ModelTrainer:
"""
Class for training models.
"""
Expand All @@ -97,7 +98,7 @@ def __init__(
self.x_test = x_test
self.y_test = y_test

def random_forest_trainer(self, fine_tuning: bool = True):
def random_forest_trainer(self, fine_tuning: bool = True) -> RegressorMixin:
"""
It trains the random forest model.

Expand All @@ -113,7 +114,7 @@ def random_forest_trainer(self, fine_tuning: bool = True):
self.x_train, self.y_train, self.x_test, self.y_test
)
study = optuna.create_study(direction="maximize")
study.optimize(hyper_opt.optimize_randomforest, n_trials=100)
study.optimize(hyper_opt.optimize_randomforest, n_trials=10)
trial = study.best_trial
n_estimators = trial.params["n_estimators"]
max_depth = trial.params["max_depth"]
Expand All @@ -137,7 +138,7 @@ def random_forest_trainer(self, fine_tuning: bool = True):
logging.error(e)
return None

def lightgbm_trainer(self, fine_tuning: bool = True):
def lightgbm_trainer(self, fine_tuning: bool = True) -> RegressorMixin:
"""
It trains the LightGBM model.

Expand All @@ -153,7 +154,7 @@ def lightgbm_trainer(self, fine_tuning: bool = True):
self.x_train, self.y_train, self.x_test, self.y_test
)
study = optuna.create_study(direction="maximize")
study.optimize(hyper_opt.optimize_lightgbm, n_trials=100)
study.optimize(hyper_opt.optimize_lightgbm, n_trials=10)
trial = study.best_trial
n_estimators = trial.params["n_estimators"]
max_depth = trial.params["max_depth"]
Expand All @@ -176,7 +177,7 @@ def lightgbm_trainer(self, fine_tuning: bool = True):
logging.error(e)
return None

def xgboost_trainer(self, fine_tuning: bool = True):
def xgboost_trainer(self, fine_tuning: bool = True) -> RegressorMixin:
"""
It trains the xgboost model.

Expand All @@ -192,7 +193,7 @@ def xgboost_trainer(self, fine_tuning: bool = True):
self.x_train, self.y_train, self.x_test, self.y_test
)
study = optuna.create_study(direction="maximize")
study.optimize(hy_opt.optimize_xgboost_regressor, n_trials=100)
study.optimize(hy_opt.optimize_xgboost_regressor, n_trials=10)
trial = study.best_trial
n_estimators = trial.params["n_estimators"]
learning_rate = trial.params["learning_rate"]
Expand Down
166 changes: 33 additions & 133 deletions customer-satisfaction/pipelines/deployment_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,146 +1,46 @@
import json
import os

import numpy as np
import pandas as pd
from materializer.custom_materializer import cs_materializer
from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (
MLFlowModelDeployer,
)
from zenml.integrations.mlflow.services import MLFlowDeploymentService
from zenml.pipelines import pipeline
from zenml.steps import BaseParameters, Output, step
from zenml.integrations.mlflow.steps import mlflow_model_deployer_step

from .utils import get_data_for_test
from zenml import pipeline, ModelVersion

requirements_file = os.path.join(os.path.dirname(__file__), "requirements.txt")


@step(enable_cache=False, output_materializers=cs_materializer)
def dynamic_importer() -> Output(data=str):
"""Downloads the latest data from a mock API."""
data = get_data_for_test()
return data


class DeploymentTriggerConfig(BaseParameters):
"""Parameters that are used to trigger the deployment"""

min_accuracy: float


@step
def deployment_trigger(
accuracy: float,
config: DeploymentTriggerConfig,
) -> bool:
"""Implements a simple model deployment trigger that looks at the
input model accuracy and decides if it is good enough to deploy"""
from pipelines.training_pipeline import customer_satisfaction_training_pipeline
from steps import predictor
from steps.dynamic_importer import dynamic_importer
from steps.model_loader import model_loader
from steps.prediction_service_loader import prediction_service_loader

return accuracy > config.min_accuracy


class MLFlowDeploymentLoaderStepParameters(BaseParameters):
"""MLflow deployment getter parameters

Attributes:
pipeline_name: name of the pipeline that deployed the MLflow prediction
server
step_name: the name of the step that deployed the MLflow prediction
server
running: when this flag is set, the step only returns a running service
model_name: the name of the model that is deployed
"""

pipeline_name: str
step_name: str
running: bool = True


@step(enable_cache=False)
def prediction_service_loader(
params: MLFlowDeploymentLoaderStepParameters,
) -> MLFlowDeploymentService:
"""Get the prediction service started by the deployment pipeline"""
requirements_file = os.path.join(os.path.dirname(__file__), "requirements.txt")

# get the MLflow model deployer stack component
model_deployer = MLFlowModelDeployer.get_active_model_deployer()

# fetch existing services with same pipeline name, step name and model name
existing_services = model_deployer.find_model_server(
pipeline_name=params.pipeline_name,
pipeline_step_name=params.step_name,
running=params.running,
@pipeline
def continuous_deployment_pipeline(
model_type: str = "lightgbm"
):
"""Run a training job and deploy an mlflow model deployment."""
# Run a training pipeline
customer_satisfaction_training_pipeline(model_type=model_type)

# Fetch the production model from the Model Registry
production_model = model_loader(
model_name="Customer_Satisfaction_Predictor",
after="model_promoter" # Make sure this runs only once the training pipeline is done
)

if not existing_services:
raise RuntimeError(
f"No MLflow prediction service deployed by the "
f"{params.step_name} step in the {params.pipeline_name} "
f"pipeline is currently "
f"running."
)

return existing_services[0]


@step()
def predictor(
service: MLFlowDeploymentService,
data: str,
) -> np.ndarray:
"""Run an inference request against a prediction service"""

service.start(timeout=10) # should be a NOP if already started
data = json.loads(data)
data.pop("columns")
data.pop("index")
columns_for_df = [
"payment_sequential",
"payment_installments",
"payment_value",
"price",
"freight_value",
"product_name_lenght",
"product_description_lenght",
"product_photos_qty",
"product_weight_g",
"product_length_cm",
"product_height_cm",
"product_width_cm",
]
df = pd.DataFrame(data["data"], columns=columns_for_df)
json_list = json.loads(json.dumps(list(df.T.to_dict().values())))
data = np.array(json_list)
prediction = service.predict(data)
return prediction
# (Re)deploy the production model
mlflow_model_deployer_step(
workers=3,
deploy_decision=True,
model=production_model
)


@pipeline(enable_cache=False)
def continuous_deployment_pipeline(
ingest_data,
clean_data,
model_train,
evaluation,
deployment_trigger,
model_deployer,
):
# Link all the steps artifacts together
df = ingest_data()
x_train, x_test, y_train, y_test = clean_data(df)
model = model_train(x_train, x_test, y_train, y_test)
mse, rmse = evaluation(model, x_test, y_test)
deployment_decision = deployment_trigger(accuracy=mse)
model_deployer(deployment_decision, model)


@pipeline
def inference_pipeline(
dynamic_importer,
prediction_service_loader,
predictor,
):
# Link all the steps artifacts together
def inference_pipeline():
"""Run a batch inference job with data loaded from an API."""
batch_data = dynamic_importer()
model_deployment_service = prediction_service_loader()
predictor(model_deployment_service, batch_data)
model_deployment_service = prediction_service_loader(
pipeline_name="continuous_deployment_pipeline",
step_name="mlflow_model_deployer_step"
)
predictor(service=model_deployment_service, input_data=batch_data)
Loading
Loading