Skip to content

Commit

Permalink
Further fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexejPenner committed Dec 21, 2023
1 parent f9420f1 commit cc2cbab
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 84 deletions.
45 changes: 11 additions & 34 deletions customer-satisfaction/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,27 +117,25 @@ Our standard training pipeline consists of several steps:
using [MLflow autologging](https://www.mlflow.org/docs/latest/tracking.html).
- `evaluation`: This step will evaluate the model and save the metrics -- using
MLflow autologging -- into the artifact store.
- `model_promoter`: This step compares the newly trained model against the previous
production model, in case it performed better, the new model is promoted

### Deployment Pipeline

We have another pipeline, the `deployment_pipeline.py`, that extends the
training pipeline, and implements a continuous deployment workflow. It ingests
and processes input data, trains a model and then (re)deploys the prediction
server that serves the model if it meets our evaluation criteria. The criteria
that we have chosen is a configurable threshold on
the [MSE](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.mean_squared_error.html)
of the training. The first four steps of the pipeline are the same as above, but
server that serves the model if it met the promotion criteria.
of the training. The first five steps of the pipeline are the same as above, but
we have added the following additional ones:

- `deployment_trigger`: The step checks whether the newly trained model meets
the criteria set for deployment.
- `model_loader`: The step loads the `production` model from the zenml model registry
- `model_deployer`: This step deploys the model as a service using MLflow (if
deployment criteria is met).

In the deployment pipeline, ZenML's MLflow tracking integration is used for
logging the hyperparameter values and the trained model itself and the model
evaluation metrics -- as MLflow experiment tracking artifacts -- into the local
MLflow backend. This pipeline also launches a local MLflow deployment server to
In the deployment pipeline ZenML's Model Control Plane is used for
logging attaching the evaluation metrics as metadata to the trained model.
This pipeline also launches a local MLflow deployment server to
serve the latest MLflow model if its accuracy is above a configured threshold.

The MLflow deployment server runs locally as a daemon process that will continue
Expand All @@ -151,9 +149,9 @@ model service asynchronously from the pipeline logic. This can be done easily
with ZenML within the Streamlit code:

```python
service = load_last_service_from_step(
service = model_deployer.find_model_server(
pipeline_name="continuous_deployment_pipeline",
step_name="model_deployer",
pipeline_step_name="mlflow_model_deployer_step",
running=True,
)
...
Expand Down Expand Up @@ -200,28 +198,7 @@ streamlit run streamlit_app.py

## :question: FAQ

1. When running the continuous deployment pipeline, I get an error
stating: `No Step found for the name mlflow_deployer`.

Solution: It happens because your artifact store is overridden after running
the continuous deployment pipeline. So, you need to delete the artifact store
and rerun the pipeline. You can get the location of the artifact store by
running the following command:

```bash
zenml artifact-store describe
```

and then you can delete the artifact store with the following command:

**Note**: This is a dangerous / destructive command! Please enter your path
carefully, otherwise it may delete other folders from your computer.

```bash
rm -rf PATH
```

2. When running the continuous deployment pipeline, I get the following
1. When running the continuous deployment pipeline, I get the following
error: `No Environment component with name mlflow is currently registered.`

Solution: You forgot to install the MLflow integration in your ZenML
Expand Down
13 changes: 8 additions & 5 deletions customer-satisfaction/config.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
enable_cache: False

extra:
tags: zenml-projects

settings:
docker:
required_integrations:
- mlflow
steps:
model_train:
experiment_tracker: mlflow_tracker
evaluation:
experiment_tracker: mlflow_tracker
#
#steps:
# train_model:
# experiment_tracker: mlflow_tracker
# evaluation:
# experiment_tracker: mlflow_tracker
3 changes: 3 additions & 0 deletions customer-satisfaction/model/data_cleaning.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def preprocess_data(self) -> pd.DataFrame:
]
self.df = self.df.drop(cols_to_drop, axis=1)

# Catchall fillna in case any where missed
self.df.fillna(self.df.mean(), inplace=True)

return self.df
except Exception as e:
logging.error(e)
Expand Down
21 changes: 16 additions & 5 deletions customer-satisfaction/pipelines/deployment_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@
enable_cache=False,
model_version=model_version
)
def continuous_deployment_pipeline():
def continuous_deployment_pipeline(
model_type: str = "lightgbm"
):
"""Run a training job and deploy an mlflow model deployment."""
model, is_promoted = customer_satisfaction_training_pipeline()
# Run a training pipeline
model, is_promoted = customer_satisfaction_training_pipeline(model_type=model_type)

# Fetch the production model from the Model Registry
production_model = model_loader(model_version.name)
production_model = model_loader(model_name=model_version.name, after="model_promoter")

# Only actually redeploy the model if the most recent training
# led to a model promotion
mlflow_model_deployer_step(
workers=3,
deploy_decision=True,
deploy_decision=is_promoted,
model=production_model
)

Expand All @@ -34,5 +41,9 @@ def continuous_deployment_pipeline():
def inference_pipeline():
"""Run a batch inference job with data loaded from an API."""
batch_data = dynamic_importer()
model_deployment_service = prediction_service_loader()
model_deployment_service = prediction_service_loader(
pipeline_name="continuous_deployment_pipeline",
step_name="mlflow_model_deployer_step",
running=True
)
predictor(model_deployment_service, batch_data)
2 changes: 1 addition & 1 deletion customer-satisfaction/pipelines/training_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ def customer_satisfaction_training_pipeline(
x_train, x_test, y_train, y_test = clean_data(df)
model = train_model(x_train=x_train, x_test=x_test, y_train=y_train, y_test=y_test, model_type=model_type)
mse, rmse = evaluation(model, x_test, y_test)
is_promoted = model_promoter(accuracy=mse)
is_promoted = model_promoter(mse=mse)
return model, is_promoted
44 changes: 13 additions & 31 deletions customer-satisfaction/run_deployment.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,34 @@
import click
from zenml.client import Client

from pipelines.deployment_pipeline import (
continuous_deployment_pipeline,
inference_pipeline,
predictor,
)
from steps.dynamic_importer import dynamic_importer
from steps.deployment_trigger import deployment_trigger
from steps.prediction_service_loader import prediction_service_loader
from rich import print
from steps.clean_data import clean_data
from steps.evaluation import evaluation
from steps.ingest_data import ingest_data
from steps.train_model import train_model
from zenml.integrations.mlflow.mlflow_utils import get_tracking_uri
from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (
MLFlowModelDeployer,
)
from zenml.integrations.mlflow.steps import (
mlflow_model_deployer_step,
)


@click.command()
@click.option(
"--min-accuracy",
default=1.8,
help="Minimum mse required to deploy the model",
)
@click.option(
"--stop-service",
is_flag=True,
default=False,
help="Stop the prediction service when done",
)
def run_main(min_accuracy: float, stop_service: bool, model_name="Customer_Satisfaction_Predictor"):
@click.option(
"--model_type",
"-m",
type=click.Choice(["lightgbm", "randomforest", "xgboost"]),
default="xgboost",
help="Here you can choose what type of model should be trained."
)
def run_main(stop_service: bool, model_type: str, model_name="Customer_Satisfaction_Predictor"):
"""Run the mlflow example pipeline"""
if stop_service:
# get the MLflow model deployer stack component
Expand All @@ -53,18 +46,11 @@ def run_main(min_accuracy: float, stop_service: bool, model_name="Customer_Satis
existing_services[0].stop(timeout=10)
return

deployment = continuous_deployment_pipeline(min_accuracy=min_accuracy)
deployment.run(config_path="config.yaml")
continuous_deployment_pipeline.with_options(config_path="config.yaml")(model_type=model_type)

inference = inference_pipeline(
dynamic_importer=dynamic_importer(),
prediction_service_loader=prediction_service_loader(
pipeline_name="continuous_deployment_pipeline",
step_name="model_deployer"
),
predictor=predictor(),
)
inference.run()
model_deployer = MLFlowModelDeployer.get_active_model_deployer()

inference_pipeline()

print(
"Now run \n "
Expand All @@ -73,13 +59,10 @@ def run_main(min_accuracy: float, stop_service: bool, model_name="Customer_Satis
"You can find your runs tracked within the `mlflow_example_pipeline`"
"experiment. Here you'll also be able to compare the two runs.)"
)

model_deployer = MLFlowModelDeployer.get_active_model_deployer()

# fetch existing services with same pipeline name, step name and model name
service = model_deployer.find_model_server(
pipeline_name="continuous_deployment_pipeline",
pipeline_step_name="model_deployer",
pipeline_step_name="mlflow_model_deployer_step",
running=True,
)

Expand All @@ -92,6 +75,5 @@ def run_main(min_accuracy: float, stop_service: bool, model_name="Customer_Satis
f"`--stop-service` argument."
)


if __name__ == "__main__":
run_main()
1 change: 0 additions & 1 deletion customer-satisfaction/steps/model_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ def model_loader(
Args:
model_name: Name of the Model to load
"""

model_version = ModelVersion(
name=model_name,
version="production"
Expand Down
17 changes: 10 additions & 7 deletions customer-satisfaction/steps/model_promoter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

@step
def model_promoter(
accuracy: float,
mse: float,
stage: str = "production"
) -> bool:
"""Model promotion step
Expand All @@ -15,7 +15,7 @@ def model_promoter(
the previous production model
Args:
accuracy: Accuracy of the model.
mse: Mean-squared error of the model.
stage: Which stage to promote the model to.
Returns:
Expand All @@ -31,17 +31,20 @@ def model_promoter(
)

try:
# In case there already is a model version at the correct stage
previous_production_model_version_mse = float(
previous_production_model.get_artifact("model").run_metadata["mse"].value
)
except RuntimeError:
previous_production_model_version_mse = 0.0
# In case no model version has been promoted before,
# default to a threshold value well above the new mse
previous_production_model_version_mse = mse + 1000

if accuracy < previous_production_model_version_mse:
if mse > previous_production_model_version_mse:
logger.info(
f"Model accuracy {accuracy*100:.2f}% is below the accuracy of "
f"the previous production model "
f"{previous_production_model_version_mse*100:.2f}% ! "
f"Model mean-squared error {mse:.2f} is higher than"
f" the mse of the previous production model "
f"{previous_production_model_version_mse:.2f} ! "
f"Not promoting model."
)
is_promoted = False
Expand Down

0 comments on commit cc2cbab

Please sign in to comment.