diff --git a/explainability-shap/run.ipynb b/explainability-shap/run.ipynb new file mode 100644 index 00000000..94e8ee4c --- /dev/null +++ b/explainability-shap/run.ipynb @@ -0,0 +1,349 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Iris Classification Pipeline with ZenML\n", + "\n", + "This notebook demonstrates a ZenML pipeline for iris classification, including data loading, model training, evaluation, explainability, and data drift detection." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from zenml.client import Client" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "Client().activate_stack(\"default\")\n", + "# Client().activate_stack(\"ihopeitworks2\")" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import numpy as np\n", + "from sklearn.datasets import load_iris\n", + "from sklearn.model_selection import train_test_split\n", + "from zenml import step\n", + "from zenml import log_artifact_metadata\n", + "from typing import Tuple, Dict, Any\n", + "from typing_extensions import Annotated\n", + "\n", + "def safe_metadata(data: Any) -> Dict[str, Any]:\n", + " \"\"\"Create metadata dict with only supported types.\"\"\"\n", + " metadata = {\"shape\": data.shape}\n", + " if isinstance(data, pd.DataFrame):\n", + " metadata[\"columns\"] = list(data.columns)\n", + " return metadata\n", + "\n", + "@step\n", + "def load_data() -> Tuple[\n", + " Annotated[pd.DataFrame, \"X_train\"],\n", + " Annotated[pd.DataFrame, \"X_test\"],\n", + " Annotated[pd.Series, \"y_train\"],\n", + " Annotated[pd.Series, \"y_test\"],\n", + "]:\n", + " \"\"\"Load the iris dataset and split into train and test sets.\"\"\"\n", + " iris = load_iris(as_frame=True)\n", + " X = iris.data\n", + " y = iris.target\n", + " X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)\n", + "\n", + " for name, data in [(\"X_train\", X_train), (\"X_test\", X_test), (\"y_train\", y_train), (\"y_test\", y_test)]:\n", + " log_artifact_metadata(\n", + " artifact_name=name,\n", + " metadata={\"dataset_info\": safe_metadata(data)}\n", + " )\n", + "\n", + " return X_train, X_test, y_train, y_test" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "from sklearn.svm import SVC\n", + "from zenml import step, ArtifactConfig\n", + "from zenml import log_model_metadata, log_artifact_metadata\n", + "from typing_extensions import Annotated\n", + "\n", + "@step\n", + "def train_model(\n", + " X_train: pd.DataFrame,\n", + " y_train: pd.Series,\n", + ") -> Annotated[SVC, ArtifactConfig(name=\"model\", is_model_artifact=True)]:\n", + " \"\"\"Train an SVM classifier.\"\"\"\n", + " model = SVC(kernel='rbf', probability=True)\n", + " model.fit(X_train, y_train)\n", + " train_accuracy = model.score(X_train, y_train)\n", + "\n", + " log_model_metadata(\n", + " metadata={\n", + " \"training_metrics\": {\n", + " \"train_accuracy\": float(train_accuracy),\n", + " },\n", + " \"model_info\": {\n", + " \"model_type\": type(model).__name__,\n", + " \"kernel\": model.kernel,\n", + " }\n", + " }\n", + " )\n", + "\n", + " log_artifact_metadata(\n", + " artifact_name=\"model\",\n", + " metadata={\n", + " \"model_details\": {\n", + " \"type\": type(model).__name__,\n", + " \"kernel\": model.kernel,\n", + " \"n_support\": model.n_support_.tolist(),\n", + " }\n", + " }\n", + " )\n", + "\n", + " return model" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import numpy as np\n", + "from sklearn.svm import SVC\n", + "from zenml import step\n", + "from zenml import log_model_metadata, log_artifact_metadata\n", + "from typing import Tuple\n", + "from typing_extensions import Annotated\n", + "\n", + "@step\n", + "def evaluate_model(\n", + " model: SVC,\n", + " X_test: pd.DataFrame,\n", + " y_test: pd.Series,\n", + ") -> Tuple[\n", + " Annotated[np.ndarray, \"predictions\"],\n", + " Annotated[np.ndarray, \"probabilities\"]\n", + "]:\n", + " \"\"\"Evaluate the model and make predictions.\"\"\"\n", + " test_accuracy = model.score(X_test, y_test)\n", + " predictions = model.predict(X_test)\n", + " probabilities = model.predict_proba(X_test)\n", + "\n", + " log_model_metadata(\n", + " metadata={\n", + " \"evaluation_metrics\": {\n", + " \"test_accuracy\": float(test_accuracy),\n", + " }\n", + " }\n", + " )\n", + "\n", + " log_artifact_metadata(\n", + " artifact_name=\"predictions\",\n", + " metadata={\n", + " \"prediction_info\": {\n", + " \"shape\": predictions.shape,\n", + " \"unique_values\": np.unique(predictions).tolist()\n", + " }\n", + " }\n", + " )\n", + "\n", + " log_artifact_metadata(\n", + " artifact_name=\"probabilities\",\n", + " metadata={\n", + " \"probability_info\": {\n", + " \"shape\": probabilities.shape,\n", + " \"min\": float(np.min(probabilities)),\n", + " \"max\": float(np.max(probabilities))\n", + " }\n", + " }\n", + " )\n", + "\n", + " return predictions, probabilities" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import shap\n", + "from sklearn.svm import SVC\n", + "from zenml import step\n", + "from zenml import log_artifact_metadata\n", + "from typing_extensions import Annotated\n", + "\n", + "class SHAPVisualization:\n", + " def __init__(self, shap_values, feature_names):\n", + " self.shap_values = shap_values\n", + " self.feature_names = feature_names\n", + "\n", + "@step\n", + "def explain_model(\n", + " model: SVC,\n", + " X_train: pd.DataFrame\n", + ") -> Annotated[SHAPVisualization, \"shap_visualization\"]:\n", + " \"\"\"Generate SHAP values for model explainability and create a visualization.\"\"\"\n", + " explainer = shap.KernelExplainer(model.predict_proba, shap.sample(X_train, 100))\n", + " shap_values = explainer.shap_values(X_train.iloc[:100])\n", + "\n", + " log_artifact_metadata(\n", + " artifact_name=\"shap_values\",\n", + " metadata={\n", + " \"shap_info\": {\n", + " \"shape\": [arr.shape for arr in shap_values],\n", + " \"n_classes\": len(shap_values),\n", + " \"n_features\": shap_values[0].shape[1],\n", + " }\n", + " }\n", + " )\n", + "\n", + " return SHAPVisualization(shap_values, X_train.columns)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "from scipy.stats import ks_2samp\n", + "from zenml import step\n", + "from zenml import log_artifact_metadata\n", + "from typing import Dict\n", + "from typing_extensions import Annotated\n", + "\n", + "@step\n", + "def detect_data_drift(\n", + " X_train: pd.DataFrame,\n", + " X_test: pd.DataFrame,\n", + ") -> Annotated[Dict[str, float], \"drift_metrics\"]:\n", + " \"\"\"Detect data drift between training and test sets.\"\"\"\n", + " drift_metrics = {}\n", + " for column in X_train.columns:\n", + " _, p_value = ks_2samp(X_train[column], X_test[column])\n", + " drift_metrics[column] = p_value\n", + "\n", + " log_artifact_metadata(\n", + " artifact_name=\"drift_metrics\",\n", + " metadata={\n", + " \"drift_summary\": {\n", + " \"high_drift_features\": [col for col, p in drift_metrics.items() if p < 0.05]\n", + " }\n", + " }\n", + " )\n", + "\n", + " return drift_metrics" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "from zenml import pipeline, Model\n", + "from zenml.config import DockerSettings\n", + "\n", + "@pipeline(\n", + " enable_cache=False,\n", + " settings={\"docker\": DockerSettings(python_package_installer=\"uv\", requirements=\"requirements.txt\")},\n", + " model=Model(name=\"high_risk_classification\")\n", + ")\n", + "def iris_classification_pipeline():\n", + " X_train, X_test, y_train, y_test = load_data()\n", + " model = train_model(X_train, y_train)\n", + " evaluate_model(model, X_test, y_test)\n", + " explain_model(model, X_train)\n", + " drift_metrics = detect_data_drift(X_train, X_test)" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[1;35mInitiating a new run for the pipeline: \u001b[0m\u001b[1;36miris_classification_pipeline\u001b[1;35m.\u001b[0m\n", + "sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml\n", + "sagemaker.config INFO - Not applying SDK defaults from location: /Users/htahir1/Library/Application Support/sagemaker/config.yaml\n", + "\u001b[1;35mArchiving notebook code...\u001b[0m\n", + "\u001b[33mCould not import GCP service connector: No module named 'google.api_core'.\u001b[0m\n", + "\u001b[33mCould not import Azure service connector: No module named 'azure'.\u001b[0m\n", + "\u001b[33mCould not import HyperAI service connector: No module named 'paramiko'.\u001b[0m\n", + "\u001b[1;35mUploading code to \u001b[0m\u001b[1;36ms3://zenml-cxwkvj-339712793861/code_uploads/a757941243d0d58c87b4f6982a9d6bcb5030cc7d.tar.gz\u001b[1;35m (Size: 1.83 KiB).\u001b[0m\n", + "\u001b[1;35mCode upload finished.\u001b[0m\n", + "\u001b[1;35mNew model version \u001b[0m\u001b[1;36m18\u001b[1;35m was created.\u001b[0m\n", + "\u001b[1;35mDashboard URL for Model Version with name 18 : \u001b[0m\u001b[34mhttps://cloud.zenml.io/organizations/fc992c14-d960-4db7-812e-8f070c99c6f0/tenants/939679ed-a10e-453d-8483-e1ac53649d42/model-versions/a1e2cdd1-1d84-4827-8dc7-54a72c6855ef\u001b[1;35m\u001b[0m\n", + "\u001b[33mUnable to use code repository to download code for this run as there are uncommitted changes.\u001b[0m\n", + "\u001b[1;35mUnable to find a build to reuse. A previous build can be reused when the following conditions are met:\n", + " * The existing build was created for the same stack, ZenML version and Python version\n", + " * The stack contains a container registry\n", + " * The Docker settings of the pipeline and all its steps are the same as for the existing build.\u001b[0m\n", + "\u001b[1;35mBuilding Docker image(s) for pipeline \u001b[0m\u001b[1;36miris_classification_pipeline\u001b[1;35m.\u001b[0m\n", + "\u001b[1;35mBuilding Docker image \u001b[0m\u001b[1;36m339712793861.dkr.ecr.us-east-2.amazonaws.com/zenml-cxwkvj:iris_classification_pipeline-orchestrator\u001b[1;35m.\u001b[0m\n", + "\u001b[1;35m- Including stack requirements: \u001b[0m\u001b[1;36maws-profile-manager\u001b[1;35m, \u001b[0m\u001b[1;36mboto3\u001b[1;35m, \u001b[0m\u001b[1;36mkubernetes\u001b[1;35m, \u001b[0m\u001b[1;36ms3fs>2022.3.0\u001b[1;35m, \u001b[0m\u001b[1;36msagemaker>=2.117.0\u001b[1;35m\u001b[0m\n", + "\u001b[1;35m- Including user-defined requirements from file \u001b[0m\u001b[1;36m/Users/htahir1/Workspace/zenml-projects/explainability-shap/requirements.txt\u001b[1;35m\u001b[0m\n", + "\u001b[1;35mStep 1/12 : FROM zenmldocker/zenml:0.64.0-py3.9\u001b[0m\n", + "\u001b[1;35mStep 2/12 : WORKDIR /app\u001b[0m\n", + "\u001b[1;35mStep 3/12 : ENV ZENML_LOGGING_COLORS_DISABLED=False\u001b[0m\n", + "\u001b[1;35mStep 4/12 : COPY .zenml_stack_integration_requirements .\u001b[0m\n", + "\u001b[1;35mStep 5/12 : RUN pip install --no-cache-dir --default-timeout=60 -r .zenml_stack_integration_requirements\u001b[0m\n", + "\u001b[1;35mStep 6/12 : COPY .zenml_user_requirements .\u001b[0m\n", + "\u001b[1;35mStep 7/12 : RUN pip install --no-cache-dir --default-timeout=60 -r .zenml_user_requirements\u001b[0m\n" + ] + } + ], + "source": [ + "\n", + "# Run the pipeline\n", + "iris_classification_pipeline()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}