diff --git a/customer-satisfaction/README.md b/customer-satisfaction/README.md index e1cc423b..5e06a596 100644 --- a/customer-satisfaction/README.md +++ b/customer-satisfaction/README.md @@ -117,27 +117,25 @@ Our standard training pipeline consists of several steps: using [MLflow autologging](https://www.mlflow.org/docs/latest/tracking.html). - `evaluation`: This step will evaluate the model and save the metrics -- using MLflow autologging -- into the artifact store. +- `model_promoter`: This step compares the newly trained model against the previous + production model, in case it performed better, the new model is promoted ### Deployment Pipeline We have another pipeline, the `deployment_pipeline.py`, that extends the training pipeline, and implements a continuous deployment workflow. It ingests and processes input data, trains a model and then (re)deploys the prediction -server that serves the model if it meets our evaluation criteria. The criteria -that we have chosen is a configurable threshold on -the [MSE](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.mean_squared_error.html) -of the training. The first four steps of the pipeline are the same as above, but +server that serves the model if it met the promotion criteria. +of the training. The first five steps of the pipeline are the same as above, but we have added the following additional ones: -- `deployment_trigger`: The step checks whether the newly trained model meets - the criteria set for deployment. +- `model_loader`: The step loads the `production` model from the zenml model registry - `model_deployer`: This step deploys the model as a service using MLflow (if deployment criteria is met). -In the deployment pipeline, ZenML's MLflow tracking integration is used for -logging the hyperparameter values and the trained model itself and the model -evaluation metrics -- as MLflow experiment tracking artifacts -- into the local -MLflow backend. This pipeline also launches a local MLflow deployment server to +In the deployment pipeline ZenML's Model Control Plane is used for +logging attaching the evaluation metrics as metadata to the trained model. +This pipeline also launches a local MLflow deployment server to serve the latest MLflow model if its accuracy is above a configured threshold. The MLflow deployment server runs locally as a daemon process that will continue @@ -151,9 +149,9 @@ model service asynchronously from the pipeline logic. This can be done easily with ZenML within the Streamlit code: ```python -service = load_last_service_from_step( +service = model_deployer.find_model_server( pipeline_name="continuous_deployment_pipeline", - step_name="model_deployer", + pipeline_step_name="mlflow_model_deployer_step", running=True, ) ... @@ -200,28 +198,7 @@ streamlit run streamlit_app.py ## :question: FAQ -1. When running the continuous deployment pipeline, I get an error - stating: `No Step found for the name mlflow_deployer`. - - Solution: It happens because your artifact store is overridden after running - the continuous deployment pipeline. So, you need to delete the artifact store - and rerun the pipeline. You can get the location of the artifact store by - running the following command: - - ```bash - zenml artifact-store describe - ``` - - and then you can delete the artifact store with the following command: - - **Note**: This is a dangerous / destructive command! Please enter your path - carefully, otherwise it may delete other folders from your computer. - - ```bash - rm -rf PATH - ``` - -2. When running the continuous deployment pipeline, I get the following +1. When running the continuous deployment pipeline, I get the following error: `No Environment component with name mlflow is currently registered.` Solution: You forgot to install the MLflow integration in your ZenML diff --git a/customer-satisfaction/config.yaml b/customer-satisfaction/config.yaml index aee04294..62ffe213 100644 --- a/customer-satisfaction/config.yaml +++ b/customer-satisfaction/config.yaml @@ -1,12 +1,15 @@ enable_cache: False + extra: tags: zenml-projects + settings: docker: required_integrations: - mlflow -steps: - model_train: - experiment_tracker: mlflow_tracker - evaluation: - experiment_tracker: mlflow_tracker +# +#steps: +# train_model: +# experiment_tracker: mlflow_tracker +# evaluation: +# experiment_tracker: mlflow_tracker \ No newline at end of file diff --git a/customer-satisfaction/model/data_cleaning.py b/customer-satisfaction/model/data_cleaning.py index 0b510270..714ddf97 100644 --- a/customer-satisfaction/model/data_cleaning.py +++ b/customer-satisfaction/model/data_cleaning.py @@ -52,6 +52,9 @@ def preprocess_data(self) -> pd.DataFrame: ] self.df = self.df.drop(cols_to_drop, axis=1) + # Catchall fillna in case any where missed + self.df.fillna(self.df.mean(), inplace=True) + return self.df except Exception as e: logging.error(e) diff --git a/customer-satisfaction/pipelines/deployment_pipeline.py b/customer-satisfaction/pipelines/deployment_pipeline.py index 04b94c73..1f0a4c24 100644 --- a/customer-satisfaction/pipelines/deployment_pipeline.py +++ b/customer-satisfaction/pipelines/deployment_pipeline.py @@ -18,14 +18,21 @@ enable_cache=False, model_version=model_version ) -def continuous_deployment_pipeline(): +def continuous_deployment_pipeline( + model_type: str = "lightgbm" +): """Run a training job and deploy an mlflow model deployment.""" - model, is_promoted = customer_satisfaction_training_pipeline() + # Run a training pipeline + model, is_promoted = customer_satisfaction_training_pipeline(model_type=model_type) + # Fetch the production model from the Model Registry - production_model = model_loader(model_version.name) + production_model = model_loader(model_name=model_version.name, after="model_promoter") + + # Only actually redeploy the model if the most recent training + # led to a model promotion mlflow_model_deployer_step( workers=3, - deploy_decision=True, + deploy_decision=is_promoted, model=production_model ) @@ -34,5 +41,9 @@ def continuous_deployment_pipeline(): def inference_pipeline(): """Run a batch inference job with data loaded from an API.""" batch_data = dynamic_importer() - model_deployment_service = prediction_service_loader() + model_deployment_service = prediction_service_loader( + pipeline_name="continuous_deployment_pipeline", + step_name="mlflow_model_deployer_step", + running=True + ) predictor(model_deployment_service, batch_data) diff --git a/customer-satisfaction/pipelines/training_pipeline.py b/customer-satisfaction/pipelines/training_pipeline.py index c71f1444..aa7023ed 100644 --- a/customer-satisfaction/pipelines/training_pipeline.py +++ b/customer-satisfaction/pipelines/training_pipeline.py @@ -25,5 +25,5 @@ def customer_satisfaction_training_pipeline( x_train, x_test, y_train, y_test = clean_data(df) model = train_model(x_train=x_train, x_test=x_test, y_train=y_train, y_test=y_test, model_type=model_type) mse, rmse = evaluation(model, x_test, y_test) - is_promoted = model_promoter(accuracy=mse) + is_promoted = model_promoter(mse=mse) return model, is_promoted \ No newline at end of file diff --git a/customer-satisfaction/run_deployment.py b/customer-satisfaction/run_deployment.py index 0b020c13..433c885a 100644 --- a/customer-satisfaction/run_deployment.py +++ b/customer-satisfaction/run_deployment.py @@ -1,5 +1,4 @@ import click -from zenml.client import Client from pipelines.deployment_pipeline import ( continuous_deployment_pipeline, @@ -7,35 +6,29 @@ predictor, ) from steps.dynamic_importer import dynamic_importer -from steps.deployment_trigger import deployment_trigger from steps.prediction_service_loader import prediction_service_loader from rich import print -from steps.clean_data import clean_data -from steps.evaluation import evaluation -from steps.ingest_data import ingest_data -from steps.train_model import train_model from zenml.integrations.mlflow.mlflow_utils import get_tracking_uri from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import ( MLFlowModelDeployer, ) -from zenml.integrations.mlflow.steps import ( - mlflow_model_deployer_step, -) @click.command() -@click.option( - "--min-accuracy", - default=1.8, - help="Minimum mse required to deploy the model", -) @click.option( "--stop-service", is_flag=True, default=False, help="Stop the prediction service when done", ) -def run_main(min_accuracy: float, stop_service: bool, model_name="Customer_Satisfaction_Predictor"): +@click.option( + "--model_type", + "-m", + type=click.Choice(["lightgbm", "randomforest", "xgboost"]), + default="xgboost", + help="Here you can choose what type of model should be trained." +) +def run_main(stop_service: bool, model_type: str, model_name="Customer_Satisfaction_Predictor"): """Run the mlflow example pipeline""" if stop_service: # get the MLflow model deployer stack component @@ -53,18 +46,11 @@ def run_main(min_accuracy: float, stop_service: bool, model_name="Customer_Satis existing_services[0].stop(timeout=10) return - deployment = continuous_deployment_pipeline(min_accuracy=min_accuracy) - deployment.run(config_path="config.yaml") + continuous_deployment_pipeline.with_options(config_path="config.yaml")(model_type=model_type) - inference = inference_pipeline( - dynamic_importer=dynamic_importer(), - prediction_service_loader=prediction_service_loader( - pipeline_name="continuous_deployment_pipeline", - step_name="model_deployer" - ), - predictor=predictor(), - ) - inference.run() + model_deployer = MLFlowModelDeployer.get_active_model_deployer() + + inference_pipeline() print( "Now run \n " @@ -73,13 +59,10 @@ def run_main(min_accuracy: float, stop_service: bool, model_name="Customer_Satis "You can find your runs tracked within the `mlflow_example_pipeline`" "experiment. Here you'll also be able to compare the two runs.)" ) - - model_deployer = MLFlowModelDeployer.get_active_model_deployer() - # fetch existing services with same pipeline name, step name and model name service = model_deployer.find_model_server( pipeline_name="continuous_deployment_pipeline", - pipeline_step_name="model_deployer", + pipeline_step_name="mlflow_model_deployer_step", running=True, ) @@ -92,6 +75,5 @@ def run_main(min_accuracy: float, stop_service: bool, model_name="Customer_Satis f"`--stop-service` argument." ) - if __name__ == "__main__": run_main() diff --git a/customer-satisfaction/steps/model_loader.py b/customer-satisfaction/steps/model_loader.py index a0934d73..63e19787 100644 --- a/customer-satisfaction/steps/model_loader.py +++ b/customer-satisfaction/steps/model_loader.py @@ -12,7 +12,6 @@ def model_loader( Args: model_name: Name of the Model to load """ - model_version = ModelVersion( name=model_name, version="production" diff --git a/customer-satisfaction/steps/model_promoter.py b/customer-satisfaction/steps/model_promoter.py index 838addc3..59949615 100644 --- a/customer-satisfaction/steps/model_promoter.py +++ b/customer-satisfaction/steps/model_promoter.py @@ -6,7 +6,7 @@ @step def model_promoter( - accuracy: float, + mse: float, stage: str = "production" ) -> bool: """Model promotion step @@ -15,7 +15,7 @@ def model_promoter( the previous production model Args: - accuracy: Accuracy of the model. + mse: Mean-squared error of the model. stage: Which stage to promote the model to. Returns: @@ -31,17 +31,20 @@ def model_promoter( ) try: + # In case there already is a model version at the correct stage previous_production_model_version_mse = float( previous_production_model.get_artifact("model").run_metadata["mse"].value ) except RuntimeError: - previous_production_model_version_mse = 0.0 + # In case no model version has been promoted before, + # default to a threshold value well above the new mse + previous_production_model_version_mse = mse + 1000 - if accuracy < previous_production_model_version_mse: + if mse > previous_production_model_version_mse: logger.info( - f"Model accuracy {accuracy*100:.2f}% is below the accuracy of " - f"the previous production model " - f"{previous_production_model_version_mse*100:.2f}% ! " + f"Model mean-squared error {mse:.2f} is higher than" + f" the mse of the previous production model " + f"{previous_production_model_version_mse:.2f} ! " f"Not promoting model." ) is_promoted = False