-
Notifications
You must be signed in to change notification settings - Fork 53
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
349 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,349 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Iris Classification Pipeline with ZenML\n", | ||
"\n", | ||
"This notebook demonstrates a ZenML pipeline for iris classification, including data loading, model training, evaluation, explainability, and data drift detection." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from zenml.client import Client" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"Client().activate_stack(\"default\")\n", | ||
"# Client().activate_stack(\"ihopeitworks2\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 2, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import pandas as pd\n", | ||
"import numpy as np\n", | ||
"from sklearn.datasets import load_iris\n", | ||
"from sklearn.model_selection import train_test_split\n", | ||
"from zenml import step\n", | ||
"from zenml import log_artifact_metadata\n", | ||
"from typing import Tuple, Dict, Any\n", | ||
"from typing_extensions import Annotated\n", | ||
"\n", | ||
"def safe_metadata(data: Any) -> Dict[str, Any]:\n", | ||
" \"\"\"Create metadata dict with only supported types.\"\"\"\n", | ||
" metadata = {\"shape\": data.shape}\n", | ||
" if isinstance(data, pd.DataFrame):\n", | ||
" metadata[\"columns\"] = list(data.columns)\n", | ||
" return metadata\n", | ||
"\n", | ||
"@step\n", | ||
"def load_data() -> Tuple[\n", | ||
" Annotated[pd.DataFrame, \"X_train\"],\n", | ||
" Annotated[pd.DataFrame, \"X_test\"],\n", | ||
" Annotated[pd.Series, \"y_train\"],\n", | ||
" Annotated[pd.Series, \"y_test\"],\n", | ||
"]:\n", | ||
" \"\"\"Load the iris dataset and split into train and test sets.\"\"\"\n", | ||
" iris = load_iris(as_frame=True)\n", | ||
" X = iris.data\n", | ||
" y = iris.target\n", | ||
" X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)\n", | ||
"\n", | ||
" for name, data in [(\"X_train\", X_train), (\"X_test\", X_test), (\"y_train\", y_train), (\"y_test\", y_test)]:\n", | ||
" log_artifact_metadata(\n", | ||
" artifact_name=name,\n", | ||
" metadata={\"dataset_info\": safe_metadata(data)}\n", | ||
" )\n", | ||
"\n", | ||
" return X_train, X_test, y_train, y_test" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 3, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import pandas as pd\n", | ||
"from sklearn.svm import SVC\n", | ||
"from zenml import step, ArtifactConfig\n", | ||
"from zenml import log_model_metadata, log_artifact_metadata\n", | ||
"from typing_extensions import Annotated\n", | ||
"\n", | ||
"@step\n", | ||
"def train_model(\n", | ||
" X_train: pd.DataFrame,\n", | ||
" y_train: pd.Series,\n", | ||
") -> Annotated[SVC, ArtifactConfig(name=\"model\", is_model_artifact=True)]:\n", | ||
" \"\"\"Train an SVM classifier.\"\"\"\n", | ||
" model = SVC(kernel='rbf', probability=True)\n", | ||
" model.fit(X_train, y_train)\n", | ||
" train_accuracy = model.score(X_train, y_train)\n", | ||
"\n", | ||
" log_model_metadata(\n", | ||
" metadata={\n", | ||
" \"training_metrics\": {\n", | ||
" \"train_accuracy\": float(train_accuracy),\n", | ||
" },\n", | ||
" \"model_info\": {\n", | ||
" \"model_type\": type(model).__name__,\n", | ||
" \"kernel\": model.kernel,\n", | ||
" }\n", | ||
" }\n", | ||
" )\n", | ||
"\n", | ||
" log_artifact_metadata(\n", | ||
" artifact_name=\"model\",\n", | ||
" metadata={\n", | ||
" \"model_details\": {\n", | ||
" \"type\": type(model).__name__,\n", | ||
" \"kernel\": model.kernel,\n", | ||
" \"n_support\": model.n_support_.tolist(),\n", | ||
" }\n", | ||
" }\n", | ||
" )\n", | ||
"\n", | ||
" return model" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 4, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import pandas as pd\n", | ||
"import numpy as np\n", | ||
"from sklearn.svm import SVC\n", | ||
"from zenml import step\n", | ||
"from zenml import log_model_metadata, log_artifact_metadata\n", | ||
"from typing import Tuple\n", | ||
"from typing_extensions import Annotated\n", | ||
"\n", | ||
"@step\n", | ||
"def evaluate_model(\n", | ||
" model: SVC,\n", | ||
" X_test: pd.DataFrame,\n", | ||
" y_test: pd.Series,\n", | ||
") -> Tuple[\n", | ||
" Annotated[np.ndarray, \"predictions\"],\n", | ||
" Annotated[np.ndarray, \"probabilities\"]\n", | ||
"]:\n", | ||
" \"\"\"Evaluate the model and make predictions.\"\"\"\n", | ||
" test_accuracy = model.score(X_test, y_test)\n", | ||
" predictions = model.predict(X_test)\n", | ||
" probabilities = model.predict_proba(X_test)\n", | ||
"\n", | ||
" log_model_metadata(\n", | ||
" metadata={\n", | ||
" \"evaluation_metrics\": {\n", | ||
" \"test_accuracy\": float(test_accuracy),\n", | ||
" }\n", | ||
" }\n", | ||
" )\n", | ||
"\n", | ||
" log_artifact_metadata(\n", | ||
" artifact_name=\"predictions\",\n", | ||
" metadata={\n", | ||
" \"prediction_info\": {\n", | ||
" \"shape\": predictions.shape,\n", | ||
" \"unique_values\": np.unique(predictions).tolist()\n", | ||
" }\n", | ||
" }\n", | ||
" )\n", | ||
"\n", | ||
" log_artifact_metadata(\n", | ||
" artifact_name=\"probabilities\",\n", | ||
" metadata={\n", | ||
" \"probability_info\": {\n", | ||
" \"shape\": probabilities.shape,\n", | ||
" \"min\": float(np.min(probabilities)),\n", | ||
" \"max\": float(np.max(probabilities))\n", | ||
" }\n", | ||
" }\n", | ||
" )\n", | ||
"\n", | ||
" return predictions, probabilities" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 5, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import pandas as pd\n", | ||
"import shap\n", | ||
"from sklearn.svm import SVC\n", | ||
"from zenml import step\n", | ||
"from zenml import log_artifact_metadata\n", | ||
"from typing_extensions import Annotated\n", | ||
"\n", | ||
"class SHAPVisualization:\n", | ||
" def __init__(self, shap_values, feature_names):\n", | ||
" self.shap_values = shap_values\n", | ||
" self.feature_names = feature_names\n", | ||
"\n", | ||
"@step\n", | ||
"def explain_model(\n", | ||
" model: SVC,\n", | ||
" X_train: pd.DataFrame\n", | ||
") -> Annotated[SHAPVisualization, \"shap_visualization\"]:\n", | ||
" \"\"\"Generate SHAP values for model explainability and create a visualization.\"\"\"\n", | ||
" explainer = shap.KernelExplainer(model.predict_proba, shap.sample(X_train, 100))\n", | ||
" shap_values = explainer.shap_values(X_train.iloc[:100])\n", | ||
"\n", | ||
" log_artifact_metadata(\n", | ||
" artifact_name=\"shap_values\",\n", | ||
" metadata={\n", | ||
" \"shap_info\": {\n", | ||
" \"shape\": [arr.shape for arr in shap_values],\n", | ||
" \"n_classes\": len(shap_values),\n", | ||
" \"n_features\": shap_values[0].shape[1],\n", | ||
" }\n", | ||
" }\n", | ||
" )\n", | ||
"\n", | ||
" return SHAPVisualization(shap_values, X_train.columns)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 8, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import pandas as pd\n", | ||
"from scipy.stats import ks_2samp\n", | ||
"from zenml import step\n", | ||
"from zenml import log_artifact_metadata\n", | ||
"from typing import Dict\n", | ||
"from typing_extensions import Annotated\n", | ||
"\n", | ||
"@step\n", | ||
"def detect_data_drift(\n", | ||
" X_train: pd.DataFrame,\n", | ||
" X_test: pd.DataFrame,\n", | ||
") -> Annotated[Dict[str, float], \"drift_metrics\"]:\n", | ||
" \"\"\"Detect data drift between training and test sets.\"\"\"\n", | ||
" drift_metrics = {}\n", | ||
" for column in X_train.columns:\n", | ||
" _, p_value = ks_2samp(X_train[column], X_test[column])\n", | ||
" drift_metrics[column] = p_value\n", | ||
"\n", | ||
" log_artifact_metadata(\n", | ||
" artifact_name=\"drift_metrics\",\n", | ||
" metadata={\n", | ||
" \"drift_summary\": {\n", | ||
" \"high_drift_features\": [col for col, p in drift_metrics.items() if p < 0.05]\n", | ||
" }\n", | ||
" }\n", | ||
" )\n", | ||
"\n", | ||
" return drift_metrics" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 9, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from zenml import pipeline, Model\n", | ||
"from zenml.config import DockerSettings\n", | ||
"\n", | ||
"@pipeline(\n", | ||
" enable_cache=False,\n", | ||
" settings={\"docker\": DockerSettings(python_package_installer=\"uv\", requirements=\"requirements.txt\")},\n", | ||
" model=Model(name=\"high_risk_classification\")\n", | ||
")\n", | ||
"def iris_classification_pipeline():\n", | ||
" X_train, X_test, y_train, y_test = load_data()\n", | ||
" model = train_model(X_train, y_train)\n", | ||
" evaluate_model(model, X_test, y_test)\n", | ||
" explain_model(model, X_train)\n", | ||
" drift_metrics = detect_data_drift(X_train, X_test)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 10, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"\u001b[1;35mInitiating a new run for the pipeline: \u001b[0m\u001b[1;36miris_classification_pipeline\u001b[1;35m.\u001b[0m\n", | ||
"sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml\n", | ||
"sagemaker.config INFO - Not applying SDK defaults from location: /Users/htahir1/Library/Application Support/sagemaker/config.yaml\n", | ||
"\u001b[1;35mArchiving notebook code...\u001b[0m\n", | ||
"\u001b[33mCould not import GCP service connector: No module named 'google.api_core'.\u001b[0m\n", | ||
"\u001b[33mCould not import Azure service connector: No module named 'azure'.\u001b[0m\n", | ||
"\u001b[33mCould not import HyperAI service connector: No module named 'paramiko'.\u001b[0m\n", | ||
"\u001b[1;35mUploading code to \u001b[0m\u001b[1;36ms3://zenml-cxwkvj-339712793861/code_uploads/a757941243d0d58c87b4f6982a9d6bcb5030cc7d.tar.gz\u001b[1;35m (Size: 1.83 KiB).\u001b[0m\n", | ||
"\u001b[1;35mCode upload finished.\u001b[0m\n", | ||
"\u001b[1;35mNew model version \u001b[0m\u001b[1;36m18\u001b[1;35m was created.\u001b[0m\n", | ||
"\u001b[1;35mDashboard URL for Model Version with name 18 : \u001b[0m\u001b[34mhttps://cloud.zenml.io/organizations/fc992c14-d960-4db7-812e-8f070c99c6f0/tenants/939679ed-a10e-453d-8483-e1ac53649d42/model-versions/a1e2cdd1-1d84-4827-8dc7-54a72c6855ef\u001b[1;35m\u001b[0m\n", | ||
"\u001b[33mUnable to use code repository to download code for this run as there are uncommitted changes.\u001b[0m\n", | ||
"\u001b[1;35mUnable to find a build to reuse. A previous build can be reused when the following conditions are met:\n", | ||
" * The existing build was created for the same stack, ZenML version and Python version\n", | ||
" * The stack contains a container registry\n", | ||
" * The Docker settings of the pipeline and all its steps are the same as for the existing build.\u001b[0m\n", | ||
"\u001b[1;35mBuilding Docker image(s) for pipeline \u001b[0m\u001b[1;36miris_classification_pipeline\u001b[1;35m.\u001b[0m\n", | ||
"\u001b[1;35mBuilding Docker image \u001b[0m\u001b[1;36m339712793861.dkr.ecr.us-east-2.amazonaws.com/zenml-cxwkvj:iris_classification_pipeline-orchestrator\u001b[1;35m.\u001b[0m\n", | ||
"\u001b[1;35m- Including stack requirements: \u001b[0m\u001b[1;36maws-profile-manager\u001b[1;35m, \u001b[0m\u001b[1;36mboto3\u001b[1;35m, \u001b[0m\u001b[1;36mkubernetes\u001b[1;35m, \u001b[0m\u001b[1;36ms3fs>2022.3.0\u001b[1;35m, \u001b[0m\u001b[1;36msagemaker>=2.117.0\u001b[1;35m\u001b[0m\n", | ||
"\u001b[1;35m- Including user-defined requirements from file \u001b[0m\u001b[1;36m/Users/htahir1/Workspace/zenml-projects/explainability-shap/requirements.txt\u001b[1;35m\u001b[0m\n", | ||
"\u001b[1;35mStep 1/12 : FROM zenmldocker/zenml:0.64.0-py3.9\u001b[0m\n", | ||
"\u001b[1;35mStep 2/12 : WORKDIR /app\u001b[0m\n", | ||
"\u001b[1;35mStep 3/12 : ENV ZENML_LOGGING_COLORS_DISABLED=False\u001b[0m\n", | ||
"\u001b[1;35mStep 4/12 : COPY .zenml_stack_integration_requirements .\u001b[0m\n", | ||
"\u001b[1;35mStep 5/12 : RUN pip install --no-cache-dir --default-timeout=60 -r .zenml_stack_integration_requirements\u001b[0m\n", | ||
"\u001b[1;35mStep 6/12 : COPY .zenml_user_requirements .\u001b[0m\n", | ||
"\u001b[1;35mStep 7/12 : RUN pip install --no-cache-dir --default-timeout=60 -r .zenml_user_requirements\u001b[0m\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"\n", | ||
"# Run the pipeline\n", | ||
"iris_classification_pipeline()" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": "Python 3", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.9.6" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 4 | ||
} |