Skip to content

Commit

Permalink
Implement actual model promotion
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexejPenner committed Dec 20, 2023
1 parent b892b58 commit f9420f1
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 172 deletions.
157 changes: 24 additions & 133 deletions customer-satisfaction/pipelines/deployment_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,147 +1,38 @@
import json
import os
from typing import Annotated

import numpy as np
import pandas as pd
from materializer.custom_materializer import cs_materializer
from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (
MLFlowModelDeployer,
)
from zenml.integrations.mlflow.services import MLFlowDeploymentService
from zenml import step, pipeline
from zenml.steps import BaseParameters

from .utils import get_data_for_test

requirements_file = os.path.join(os.path.dirname(__file__), "requirements.txt")


@step(enable_cache=False, output_materializers=cs_materializer)
def dynamic_importer() -> Annotated[str, "data"]:
"""Downloads the latest data from a mock API."""
data = get_data_for_test()
return data


class DeploymentTriggerConfig(BaseParameters):
"""Parameters that are used to trigger the deployment"""

min_accuracy: float


@step
def deployment_trigger(
accuracy: float,
config: DeploymentTriggerConfig,
) -> bool:
"""Implements a simple model deployment trigger that looks at the
input model accuracy and decides if it is good enough to deploy"""

return accuracy > config.min_accuracy
from zenml.integrations.mlflow.steps import mlflow_model_deployer_step

from zenml import pipeline

class MLFlowDeploymentLoaderStepParameters(BaseParameters):
"""MLflow deployment getter parameters
from pipelines.training_pipeline import customer_satisfaction_training_pipeline
from pipelines.utils import model_version
from steps import ingest_data, clean_data, train_model, evaluation, predictor
from steps.dynamic_importer import dynamic_importer
from steps.model_loader import model_loader
from steps.prediction_service_loader import prediction_service_loader

Attributes:
pipeline_name: name of the pipeline that deployed the MLflow prediction
server
step_name: the name of the step that deployed the MLflow prediction
server
running: when this flag is set, the step only returns a running service
model_name: the name of the model that is deployed
"""

pipeline_name: str
step_name: str
running: bool = True


@step(enable_cache=False)
def prediction_service_loader(
params: MLFlowDeploymentLoaderStepParameters,
) -> MLFlowDeploymentService:
"""Get the prediction service started by the deployment pipeline"""
requirements_file = os.path.join(os.path.dirname(__file__), "requirements.txt")

# get the MLflow model deployer stack component
model_deployer = MLFlowModelDeployer.get_active_model_deployer()

# fetch existing services with same pipeline name, step name and model name
existing_services = model_deployer.find_model_server(
pipeline_name=params.pipeline_name,
pipeline_step_name=params.step_name,
running=params.running,
@pipeline(
enable_cache=False,
model_version=model_version
)
def continuous_deployment_pipeline():
"""Run a training job and deploy an mlflow model deployment."""
model, is_promoted = customer_satisfaction_training_pipeline()
# Fetch the production model from the Model Registry
production_model = model_loader(model_version.name)
mlflow_model_deployer_step(
workers=3,
deploy_decision=True,
model=production_model
)

if not existing_services:
raise RuntimeError(
f"No MLflow prediction service deployed by the "
f"{params.step_name} step in the {params.pipeline_name} "
f"pipeline is currently "
f"running."
)

return existing_services[0]


@step()
def predictor(
service: MLFlowDeploymentService,
data: str,
) -> np.ndarray:
"""Run an inference request against a prediction service"""

service.start(timeout=10) # should be a NOP if already started
data = json.loads(data)
data.pop("columns")
data.pop("index")
columns_for_df = [
"payment_sequential",
"payment_installments",
"payment_value",
"price",
"freight_value",
"product_name_lenght",
"product_description_lenght",
"product_photos_qty",
"product_weight_g",
"product_length_cm",
"product_height_cm",
"product_width_cm",
]
df = pd.DataFrame(data["data"], columns=columns_for_df)
json_list = json.loads(json.dumps(list(df.T.to_dict().values())))
data = np.array(json_list)
prediction = service.predict(data)
return prediction


@pipeline(enable_cache=False)
def continuous_deployment_pipeline(
ingest_data,
clean_data,
model_train,
evaluation,
deployment_trigger,
model_deployer,
):
# Link all the steps artifacts together
df = ingest_data()
x_train, x_test, y_train, y_test = clean_data(df)
model = model_train(x_train, x_test, y_train, y_test)
mse, rmse = evaluation(model, x_test, y_test)
deployment_decision = deployment_trigger(accuracy=mse)
model_deployer(deployment_decision, model)


@pipeline
def inference_pipeline(
dynamic_importer,
prediction_service_loader,
predictor,
):
# Link all the steps artifacts together
def inference_pipeline():
"""Run a batch inference job with data loaded from an API."""
batch_data = dynamic_importer()
model_deployment_service = prediction_service_loader()
predictor(model_deployment_service, batch_data)
25 changes: 11 additions & 14 deletions customer-satisfaction/pipelines/training_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
from zenml.config import DockerSettings
from zenml.integrations.constants import MLFLOW
from zenml import pipeline
from zenml.model.model_version import ModelVersion

from steps import ingest_data, clean_data, train_model, evaluation
from typing import Tuple, Annotated

from sklearn.base import RegressorMixin
from zenml import pipeline

docker_settings = DockerSettings(required_integrations=[MLFLOW])
model_version = ModelVersion(
name="Customer_Satisfaction_Predictor",
description="Predictor of Customer Satisfaction.",
delete_new_version_on_failure=True,
tags=["classification", "customer_satisfaction"],
)
from pipelines.utils import docker_settings, model_version
from steps import ingest_data, clean_data, train_model, evaluation, \
model_promoter


@pipeline(
enable_cache=True,
settings={"docker": docker_settings},
model_version=model_version
)
def customer_satisfaction_training_pipeline(model_type: str = "lightgbm"):
def customer_satisfaction_training_pipeline(
model_type: str = "lightgbm"
) -> Tuple[Annotated[RegressorMixin, "model"], Annotated[bool, "is_promoted"]]:
"""Training Pipeline.
Args:
Expand All @@ -30,3 +25,5 @@ def customer_satisfaction_training_pipeline(model_type: str = "lightgbm"):
x_train, x_test, y_train, y_test = clean_data(df)
model = train_model(x_train=x_train, x_test=x_test, y_train=y_train, y_test=y_test, model_type=model_type)
mse, rmse = evaluation(model, x_test, y_test)
is_promoted = model_promoter(accuracy=mse)
return model, is_promoted
13 changes: 13 additions & 0 deletions customer-satisfaction/pipelines/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import logging

import pandas as pd
from zenml import ModelVersion
from zenml.config import DockerSettings
from zenml.integrations.constants import MLFLOW

from model.data_cleaning import DataCleaning


Expand All @@ -16,3 +20,12 @@ def get_data_for_test():
except Exception as e:
logging.error(e)
raise e


docker_settings = DockerSettings(required_integrations=[MLFLOW])
model_version = ModelVersion(
name="Customer_Satisfaction_Predictor",
description="Predictor of Customer Satisfaction.",
delete_new_version_on_failure=True,
tags=["classification", "customer_satisfaction"],
)
25 changes: 5 additions & 20 deletions customer-satisfaction/run_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
from zenml.client import Client

from pipelines.deployment_pipeline import (
DeploymentTriggerConfig,
MLFlowDeploymentLoaderStepParameters,
continuous_deployment_pipeline,
deployment_trigger,
dynamic_importer,
inference_pipeline,
prediction_service_loader,
predictor,
)
from steps.dynamic_importer import dynamic_importer
from steps.deployment_trigger import deployment_trigger
from steps.prediction_service_loader import prediction_service_loader
from rich import print
from steps.clean_data import clean_data
from steps.evaluation import evaluation
Expand Down Expand Up @@ -55,27 +53,14 @@ def run_main(min_accuracy: float, stop_service: bool, model_name="Customer_Satis
existing_services[0].stop(timeout=10)
return

deployment = continuous_deployment_pipeline(
ingest_data=ingest_data(),
clean_data=clean_data(),
model_train=train_model(),
evaluation=evaluation(),
deployment_trigger=deployment_trigger(
config=DeploymentTriggerConfig(
min_accuracy=min_accuracy,
)
),
model_deployer=mlflow_model_deployer_step(workers=3),
)
deployment = continuous_deployment_pipeline(min_accuracy=min_accuracy)
deployment.run(config_path="config.yaml")

inference = inference_pipeline(
dynamic_importer=dynamic_importer(),
prediction_service_loader=prediction_service_loader(
MLFlowDeploymentLoaderStepParameters(
pipeline_name="continuous_deployment_pipeline",
step_name="model_deployer",
)
step_name="model_deployer"
),
predictor=predictor(),
)
Expand Down
8 changes: 7 additions & 1 deletion customer-satisfaction/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,10 @@
)
from .train_model import (
train_model,
)
)
from .predictor import (
predictor
)
from .model_promoter import (
model_promoter
)
13 changes: 13 additions & 0 deletions customer-satisfaction/steps/dynamic_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import Annotated

from zenml import step

from materializer.custom_materializer import cs_materializer
from pipelines.utils import get_data_for_test


@step(enable_cache=False, output_materializers=cs_materializer)
def dynamic_importer() -> Annotated[str, "data"]:
"""Downloads the latest data from a mock API."""
data = get_data_for_test()
return data
4 changes: 3 additions & 1 deletion customer-satisfaction/steps/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ def evaluation(
Annotated[float, "r2_score"],
Annotated[float, "rmse"]
]:
"""
"""Evaluates the Model on the Test Dataset and returns the metrics.
Args:
model: RegressorMixin
x_test: pd.DataFrame
y_test: pd.Series
Returns:
r2_score: float
rmse: float
Expand Down
5 changes: 2 additions & 3 deletions customer-satisfaction/steps/ingest_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ def get_data(self) -> pd.DataFrame:

@step
def ingest_data() -> pd.DataFrame:
"""
Args:
None
""" Ingest Data and return a Dataframe with the whole dataset.
Returns:
df: pd.DataFrame
"""
Expand Down
21 changes: 21 additions & 0 deletions customer-satisfaction/steps/model_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from sklearn.base import RegressorMixin
from zenml import step, ModelVersion
from zenml.client import Client


@step
def model_loader(
model_name: str
) -> RegressorMixin:
"""Implements a simple model loader that loads the current production model.
Args:
model_name: Name of the Model to load
"""

model_version = ModelVersion(
name=model_name,
version="production"
)
model_artifact: RegressorMixin = model_version.load_artifact("model")
return model_artifact
Loading

0 comments on commit f9420f1

Please sign in to comment.