Skip to content

Commit

Permalink
added updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ayush714 committed Jul 20, 2023
1 parent 1f3048e commit 1b8f892
Show file tree
Hide file tree
Showing 13 changed files with 469 additions and 464 deletions.
Binary file added .DS_Store
Binary file not shown.
57 changes: 27 additions & 30 deletions customer-satisfaction/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ cd zenml-projects/customer-satisfaction
pip install -r requirements.txt
```

Starting with ZenML 0.20.0, ZenML comes bundled with a React-based dashboard. This dashboard allows you
Starting with ZenML 0.20.0, ZenML comes bundled with a React-based dashboard. This dashboard allows you
to observe your stacks, stack components and pipeline DAGs in a dashboard interface. To access this, you need to [launch the ZenML Server and Dashboard locally](https://docs.zenml.io/user-guide/starter-guide#explore-the-dashboard), but first you must install the optional dependencies for the ZenML server:

```bash
Expand All @@ -32,7 +32,8 @@ If you are running the `run_deployment.py` script, you will also need to install

```bash
zenml integration install mlflow -y
```
```

The project can only be executed with a ZenML stack that has an MLflow experiment tracker and model deployer as a component. Configuring a new stack with the two components are as follows:

```bash
Expand Down Expand Up @@ -71,22 +72,20 @@ Our standard training pipeline consists of several steps:

We have another pipeline, the `deployment_pipeline.py`, that extends the training pipeline, and implements a continuous deployment workflow. It ingests and processes input data, trains a model and then (re)deploys the prediction server that serves the model if it meets our evaluation criteria. The criteria that we have chosen is a configurable threshold on the [MSE](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.mean_squared_error.html) of the training. The first four steps of the pipeline are the same as above, but we have added the following additional ones:


- `deployment_trigger`: The step checks whether the newly trained model meets the criteria set for deployment.
- `model_deployer`: This step deploys the model as a service using MLflow (if deployment criteria is met).


In the deployment pipeline, ZenML's MLflow tracking integration is used for logging the hyperparameter values and the trained model itself and the model evaluation metrics -- as MLflow experiment tracking artifacts -- into the local MLflow backend. This pipeline also launches a local MLflow deployment server to serve the latest MLflow model if its accuracy is above a configured threshold.

The MLflow deployment server runs locally as a daemon process that will continue to run in the background after the example execution is complete. When a new pipeline is run which produces a model that passes the accuracy threshold validation, the pipeline automatically updates the currently running MLflow deployment server to serve the new model instead of the old one.

To round it off, we deploy a Streamlit application that consumes the latest model service asynchronously from the pipeline logic. This can be done easily with ZenML within the Streamlit code:

```python
service = load_last_service_from_step(
pipeline_name="continuous_deployment_pipeline",
step_name="model_deployer",
running=True,
service = prediction_service_loader(
pipeline_name="continuous_deployment_pipeline",
pipeline_step_name="mlflow_model_deployer_step",
running=False,
)
...
service.predict(...) # Predict on incoming data from the application
Expand All @@ -100,20 +99,18 @@ While this ZenML Project trains and deploys a model locally, other ZenML integra

You can run two pipelines as follows:

- Training pipeline:
- Training pipeline:

```bash
python run_pipeline.py
```

- The continuous deployment pipeline:


```bash
python run_deployment.py
```


## 🕹 Demo Streamlit App

There is a live demo of this project using [Streamlit](https://streamlit.io/) which you can find [here](https://share.streamlit.io/ayush714/customer-satisfaction/main). It takes some input features for the product and predicts the customer satisfaction rate using the latest trained models. If you want to run this Streamlit app in your local system, you can run the following command:-
Expand All @@ -126,24 +123,24 @@ streamlit run streamlit_app.py

1. When running the continuous deployment pipeline, I get an error stating: `No Step found for the name mlflow_deployer`.

Solution: It happens because your artifact store is overridden after running the continuous deployment pipeline. So, you need to delete the artifact store and rerun the pipeline. You can get the location of the artifact store by running the following command:
```bash
zenml artifact-store describe
```
and then you can delete the artifact store with the following command:
**Note**: This is a dangerous / destructive command! Please enter your path carefully, otherwise it may delete other folders from your computer.
```bash
rm -rf PATH
```
Solution: It happens because your artifact store is overridden after running the continuous deployment pipeline. So, you need to delete the artifact store and rerun the pipeline. You can get the location of the artifact store by running the following command:

```bash
zenml artifact-store describe
```

and then you can delete the artifact store with the following command:

**Note**: This is a dangerous / destructive command! Please enter your path carefully, otherwise it may delete other folders from your computer.

```bash
rm -rf PATH
```

2. When running the continuous deployment pipeline, I get the following error: `No Environment component with name mlflow is currently registered.`
Solution: You forgot to install the MLflow integration in your ZenML environment. So, you need to install the MLflow integration by running the following command:
```bash
zenml integration install mlflow -y
```

Solution: You forgot to install the MLflow integration in your ZenML environment. So, you need to install the MLflow integration by running the following command:

```bash
zenml integration install mlflow -y
```
79 changes: 48 additions & 31 deletions customer-satisfaction/model/data_cleaning.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
import logging
from abc import ABC, abstractmethod
from typing import Union

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split


class DataCleaning:
class DataStrategy(ABC):
"""
Data cleaning class which preprocesses the data and divides it into train and test data.
Abstract Class defining strategy for handling data
"""

def __init__(self, data: pd.DataFrame) -> None:
"""Initializes the DataCleaning class."""
self.df = data
@abstractmethod
def handle_data(self, data: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]:
pass


class DataPreprocessStrategy(DataStrategy):
"""
Data preprocessing strategy which preprocesses the data.
"""

def preprocess_data(self) -> pd.DataFrame:
def handle_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""
Removes columns which are not required, fills missing values with median average values, and converts the data type to float.
"""
try:
self.df = self.df.drop(
data = data.drop(
[
"order_approved_at",
"order_delivered_carrier_date",
Expand All @@ -30,44 +37,54 @@ def preprocess_data(self) -> pd.DataFrame:
],
axis=1,
)
self.df["product_weight_g"].fillna(
self.df["product_weight_g"].median(), inplace=True
)
self.df["product_length_cm"].fillna(
self.df["product_length_cm"].median(), inplace=True
)
self.df["product_height_cm"].fillna(
self.df["product_height_cm"].median(), inplace=True
)
self.df["product_width_cm"].fillna(
self.df["product_width_cm"].median(), inplace=True
)
data["product_weight_g"].fillna(data["product_weight_g"].median(), inplace=True)
data["product_length_cm"].fillna(data["product_length_cm"].median(), inplace=True)
data["product_height_cm"].fillna(data["product_height_cm"].median(), inplace=True)
data["product_width_cm"].fillna(data["product_width_cm"].median(), inplace=True)
# write "No review" in review_comment_message column
self.df["review_comment_message"].fillna("No review", inplace=True)
data["review_comment_message"].fillna("No review", inplace=True)

self.df = self.df.select_dtypes(include=[np.number])
cols_to_drop = [
"customer_zip_code_prefix",
"order_item_id",
]
self.df = self.df.drop(cols_to_drop, axis=1)
data = data.select_dtypes(include=[np.number])
cols_to_drop = ["customer_zip_code_prefix", "order_item_id"]
data = data.drop(cols_to_drop, axis=1)

return self.df
return data
except Exception as e:
logging.error(e)
raise e

def divide_data(self, df: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]:

class DataDivideStrategy(DataStrategy):
"""
Data dividing strategy which divides the data into train and test data.
"""

def handle_data(self, data: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]:
"""
It divides the data into train and test data.
Divides the data into train and test data.
"""
try:
X = df.drop("review_score", axis=1)
y = df["review_score"]
X = data.drop("review_score", axis=1)
y = data["review_score"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
return X_train, X_test, y_train, y_test
except Exception as e:
logging.error(e)
raise e


class DataCleaning:
"""
Data cleaning class which preprocesses the data and divides it into train and test data.
"""

def __init__(self, data: pd.DataFrame, strategy: DataStrategy) -> None:
"""Initializes the DataCleaning class with a specific strategy."""
self.df = data
self.strategy = strategy

def handle_data(self) -> Union[pd.DataFrame, pd.Series]:
"""Handle data based on the provided strategy"""
return self.strategy.handle_data(self.df)
105 changes: 41 additions & 64 deletions customer-satisfaction/model/evaluation.py
Original file line number Diff line number Diff line change
@@ -1,112 +1,89 @@
import logging
from abc import ABC, abstractmethod

import numpy as np
from sklearn.metrics import mean_squared_error, r2_score


class Evaluation:
class Evaluation(ABC):
"""
Evaluation class which evaluates the model performance using the sklearn metrics.
Abstract Class defining the strategy for evaluating model performance
"""

def __init__(self) -> None:
"""Initializes the Evaluation class."""
@abstractmethod
def calculate_score(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
pass

def mean_squared_error(
self, y_true: np.ndarray, y_pred: np.ndarray
) -> float:

class MSE(Evaluation):
"""
Evaluation strategy that uses Mean Squared Error (MSE)
"""
def calculate_score(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
"""
Mean Squared Error (MSE) is the mean of the squared errors.
Args:
y_true: np.ndarray
y_pred: np.ndarray
Returns:
mse: float
"""
try:
logging.info(
"Entered the mean_squared_error method of the Evaluation class",
)
logging.info("Entered the calculate_score method of the MSE class")
mse = mean_squared_error(y_true, y_pred)
logging.info(
"The mean squared error value is: " + str(mse),
)

logging.info("The mean squared error value is: " + str(mse))
return mse
except Exception as e:
logging.info(
"Exception occurred in mean_squared_error method of the Evaluation class. Exception message: "
+ str(e),
)
logging.info(
"Exited the mean_squared_error method of the Evaluation class",
logging.error(
"Exception occurred in calculate_score method of the MSE class. Exception message: "
+ str(e)
)
raise Exception()
raise e

def r2_score(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
"""
R2 Score (R2) is a statistical measure of how close the observed values
are to the predicted values. It is also known as the coefficient of
determination.

class R2Score(Evaluation):
"""
Evaluation strategy that uses R2 Score
"""
def calculate_score(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
"""
Args:
y_true: np.ndarray
y_pred: np.ndarray
Returns:
r2_score: float
"""
try:
logging.info(
"Entered the r2_score method of the Evaluation class",
)
logging.info("Entered the calculate_score method of the R2Score class")
r2 = r2_score(y_true, y_pred)
logging.info(
"The r2 score value is: " + str(r2),
)
logging.info(
"Exited the r2_score method of the Evaluation class",
)
logging.info("The r2 score value is: " + str(r2))
return r2
except Exception as e:
logging.info(
"Exception occurred in r2_score method of the Evaluation class. Exception message: "
+ str(e),
logging.error(
"Exception occurred in calculate_score method of the R2Score class. Exception message: "
+ str(e)
)
logging.info(
"Exited the r2_score method of the Evaluation class",
)
raise Exception()
raise e

def root_mean_squared_error(
self, y_true: np.ndarray, y_pred: np.ndarray
) -> float:

class RMSE(Evaluation):
"""
Evaluation strategy that uses Root Mean Squared Error (RMSE)
"""
def calculate_score(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
"""
Root Mean Squared Error (RMSE) is the square root of the mean of the
squared errors.
Args:
y_true: np.ndarray
y_pred: np.ndarray
Return:
Returns:
rmse: float
"""
try:
logging.info(
"Entered the root_mean_squared_error method of the Evaluation class",
)
logging.info("Entered the calculate_score method of the RMSE class")
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
logging.info(
"The root mean squared error value is: " + str(rmse),
)
logging.info("The root mean squared error value is: " + str(rmse))
return rmse
except Exception as e:
logging.info(
"Exception occurred in root_mean_squared_error method of the Evaluation class. Exception message: "
+ str(e),
)
logging.info(
"Exited the root_mean_squared_error method of the Evaluation class",
logging.error(
"Exception occurred in calculate_score method of the RMSE class. Exception message: "
+ str(e)
)
raise Exception()
raise e
Loading

0 comments on commit 1b8f892

Please sign in to comment.