Skip to content

Commit

Permalink
Merge branch 'develop' into docs/finetune-embeddings-guide
Browse files Browse the repository at this point in the history
  • Loading branch information
strickvl authored Aug 8, 2024
2 parents 7e03183 + 6730a16 commit e0a68c3
Show file tree
Hide file tree
Showing 14 changed files with 261 additions and 10 deletions.
6 changes: 4 additions & 2 deletions docs/book/component-guide/orchestrators/azureml.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ You should use the AzureML orchestrator if:
The ZenML AzureML orchestrator implementation uses [the Python SDK v2 of
AzureML](https://learn.microsoft.com/en-gb/python/api/overview/azure/ai-ml-readme?view=azure-python)
to allow our users to build their Machine Learning pipelines. For each ZenML step,
it creates an AzureML `[CommandComponent](https://learn.microsoft.com/en-us/python/api/azure-ai-ml/azure.ai.ml.entities.commandcomponent?view=azure-python)` and brings them together in a pipeline.
it creates an AzureML [CommandComponent](https://learn.microsoft.com/en-us/python/api/azure-ai-ml/azure.ai.ml.entities.commandcomponent?view=azure-python)
and brings them together in a pipeline.

## How to deploy it

Expand Down Expand Up @@ -149,7 +150,8 @@ def pipeline():
### Run pipelines on a schedule

The AzureML orchestrator supports running pipelines on a schedule using
its `[JobSchedules](https://learn.microsoft.com/en-us/azure/templates/microsoft.automation/2023-11-01/automationaccounts/jobschedules?pivots=deployment-language-bicep)`. Both cron expression and intervals are supported.
its [JobSchedules](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-schedule-pipeline-job?view=azureml-api-2&tabs=python).
Both cron expression and intervals are supported.

```python
from zenml.config.schedule import Schedule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ZenML steps and pipelines can be defined in a Jupyter notebook and executed remo

Learn more about it in the following sections:

<table data-view="cards"><thead><tr><th></th><th></th><th></th><th data-hidden data-card-target data-type="content-ref"></th></tr></thead><tbody><tr><td>Define steps in notebook cells</td><td></td><td></td><td><a href="define-steps-in-notebook-cells.md">define-steps-in-notebook-cells.md</a></td></tr><tr><td>Configure the notebook path</td><td></td><td></td><td></td></tr></tbody></table>
<table data-view="cards"><thead><tr><th></th><th></th><th></th><th data-hidden data-card-target data-type="content-ref"></th></tr></thead><tbody><tr><td>Define steps in notebook cells</td><td></td><td></td><td><a href="define-steps-in-notebook-cells.md">define-steps-in-notebook-cells.md</a></td></tr></tbody></table>

<!-- For scarf -->
<figure><img alt="ZenML Scarf" referrerpolicy="no-referrer-when-downgrade" src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" /></figure>
8 changes: 8 additions & 0 deletions src/zenml/cli/web_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
DEVICE_AUTHORIZATION,
LOGIN,
VERSION_1,
ZENML_PRO_CONNECTION_ISSUES_SUSPENDED_PAUSED_TENANT_HINT,
)
from zenml.exceptions import AuthorizationException, OAuthError
from zenml.logger import get_logger
Expand Down Expand Up @@ -93,6 +94,11 @@ def web_login(url: str, verify_ssl: Union[str, bool]) -> str:
# Get rid of any trailing slashes to prevent issues when having double
# slashes in the URL
url = url.rstrip("/")
zenml_pro_extra = ""
if ".zenml.io" in url:
zenml_pro_extra = (
ZENML_PRO_CONNECTION_ISSUES_SUSPENDED_PAUSED_TENANT_HINT
)
try:
auth_url = url + API + VERSION_1 + DEVICE_AUTHORIZATION
response = requests.post(
Expand All @@ -111,6 +117,7 @@ def web_login(url: str, verify_ssl: Union[str, bool]) -> str:
logger.info(f"Error: {response.status_code} {response.text}")
raise AuthorizationException(
"Could not connect to API server. Please check the URL."
+ zenml_pro_extra
)
except (requests.exceptions.JSONDecodeError, ValueError, TypeError):
logger.exception("Bad response received from API server.")
Expand All @@ -121,6 +128,7 @@ def web_login(url: str, verify_ssl: Union[str, bool]) -> str:
logger.exception("Could not connect to API server.")
raise AuthorizationException(
"Could not connect to API server. Please check the URL."
+ zenml_pro_extra
)

# Open the verification URL in the user's browser
Expand Down
9 changes: 9 additions & 0 deletions src/zenml/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def handle_int_env_var(var: str, default: int = 0) -> int:
)
DEFAULT_ZENML_SERVER_SECURE_HEADERS_REPORT_TO = "default"
DEFAULT_ZENML_SERVER_USE_LEGACY_DASHBOARD = False
DEFAULT_ZENML_SERVER_REPORT_USER_ACTIVITY_TO_DB_SECONDS = 30

# Configurations to decide which resources report their usage and check for
# entitlement in the case of a cloud deployment. Expected Format is this:
Expand Down Expand Up @@ -492,3 +493,11 @@ def handle_int_env_var(var: str, default: int = 0) -> int:


STACK_DEPLOYMENT_API_TOKEN_EXPIRATION = 60 * 6 # 6 hours

# ZenML Pro
ZENML_PRO_CONNECTION_ISSUES_SUSPENDED_PAUSED_TENANT_HINT = (
"\nHINT: Since you are trying to communicate with the ZenML Pro Tenant, "
"please make sure that your tenant is in RUNNING state on your "
"Organization page. If the tenant is PAUSED you can `Resume` it via UI "
"and try again."
)
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,9 @@ def _upload_and_run_pipeline(
run_name: Orchestrator run name.
settings: Pipeline level settings for this orchestrator.
schedule: The schedule the pipeline will run on.
Raises:
RuntimeError: If the Vertex Orchestrator fails to provision or any other Runtime errors
"""
# We have to replace the hyphens in the run name with underscores
# and lower case the string, because the Vertex AI Pipelines service
Expand Down Expand Up @@ -656,13 +659,15 @@ def _upload_and_run_pipeline(
run.wait()

except google_exceptions.ClientError as e:
logger.warning(
"Failed to create the Vertex AI Pipelines job: %s", e
logger.error("Failed to create the Vertex AI Pipelines job: %s", e)
raise RuntimeError(
f"Failed to create the Vertex AI Pipelines job: {e}"
)
except RuntimeError as e:
logger.error(
"The Vertex AI Pipelines job execution has failed: %s", e
)
raise

def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Expand Down
2 changes: 1 addition & 1 deletion src/zenml/integrations/mlflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MlflowIntegration(Integration):
NAME = MLFLOW

REQUIREMENTS = [
"mlflow>=2.1.1,<=2.14.2",
"mlflow>=2.1.1,<3",
"mlserver>=1.3.3",
"mlserver-mlflow>=1.3.3",
# TODO: remove this requirement once rapidjson is fixed
Expand Down
12 changes: 12 additions & 0 deletions src/zenml/models/v2/core/server_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ class ServerSettingsResponseBody(BaseResponseBody):
display_updates: Optional[bool] = Field(
title="Whether to display notifications about ZenML updates in the dashboard.",
)
last_user_activity: datetime = Field(
title="The timestamp when the last user activity was detected.",
)
updated: datetime = Field(
title="The timestamp when this resource was last updated."
)
Expand Down Expand Up @@ -179,6 +182,15 @@ def active(self) -> bool:
"""
return self.get_body().active

@property
def last_user_activity(self) -> datetime:
"""The `last_user_activity` property.
Returns:
the value of the property.
"""
return self.get_body().last_user_activity

@property
def updated(self) -> datetime:
"""The `updated` property.
Expand Down
8 changes: 7 additions & 1 deletion src/zenml/models/v2/misc/server_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
# permissions and limitations under the License.
"""Model definitions for ZenML servers."""

from typing import Dict
from datetime import datetime
from typing import Dict, Optional
from uuid import UUID, uuid4

from pydantic import BaseModel, Field
Expand Down Expand Up @@ -103,6 +104,11 @@ class ServerModel(BaseModel):
title="Flag to indicate whether the server is using the legacy dashboard.",
)

last_user_activity: Optional[datetime] = Field(
None,
title="Timestamp of latest user activity traced on the server.",
)

def is_local(self) -> bool:
"""Return whether the server is running locally.
Expand Down
75 changes: 75 additions & 0 deletions src/zenml/zen_server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import os
from functools import wraps
from typing import (
TYPE_CHECKING,
Any,
Callable,
List,
Optional,
Tuple,
Type,
Expand All @@ -33,7 +35,10 @@
from zenml.config.global_config import GlobalConfiguration
from zenml.config.server_config import ServerConfiguration
from zenml.constants import (
API,
ENV_ZENML_SERVER,
INFO,
VERSION_1,
)
from zenml.enums import ServerProviderType
from zenml.exceptions import IllegalOperationError, OAuthError
Expand All @@ -53,6 +58,9 @@
)
from zenml.zen_stores.sql_zen_store import SqlZenStore

if TYPE_CHECKING:
from fastapi import Request

logger = get_logger(__name__)

_zen_store: Optional["SqlZenStore"] = None
Expand Down Expand Up @@ -570,3 +578,70 @@ def verify_admin_status_if_no_rbac(
"without RBAC enabled.",
)
return


def is_user_request(request: "Request") -> bool:
"""Determine if the incoming request is a user request.
This function checks various aspects of the request to determine
if it's a user-initiated request or a system request.
Args:
request: The incoming FastAPI request object.
Returns:
True if it's a user request, False otherwise.
"""
# Define system paths that should be excluded
system_paths: List[str] = [
"/health",
"/metrics",
"/system",
"/docs",
"/redoc",
"/openapi.json",
]

user_prefix = f"{API}{VERSION_1}"
excluded_user_apis = [INFO]
# Check if this is not an excluded endpoint
if request.url.path in [
user_prefix + suffix for suffix in excluded_user_apis
]:
return False

# Check if this is other user request
if request.url.path.startswith(user_prefix):
return True

# Exclude system paths
if any(request.url.path.startswith(path) for path in system_paths):
return False

# Exclude requests with specific headers
if request.headers.get("X-System-Request") == "true":
return False

# Exclude requests from certain user agents (e.g., monitoring tools)
user_agent = request.headers.get("User-Agent", "").lower()
system_agents = ["prometheus", "datadog", "newrelic", "pingdom"]
if any(agent in user_agent for agent in system_agents):
return False

# Check for internal IP addresses
client_host = request.client.host if request.client else None
if client_host and (
client_host.startswith("10.") or client_host.startswith("192.168.")
):
return False

# Exclude OPTIONS requests (often used for CORS preflight)
if request.method == "OPTIONS":
return False

# Exclude specific query parameters that might indicate system requests
if request.query_params.get("system_check"):
return False

# If none of the above conditions are met, consider it a user request
return True
53 changes: 52 additions & 1 deletion src/zenml/zen_server/zen_server_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import os
from asyncio.log import logger
from datetime import datetime, timedelta, timezone
from genericpath import isfile
from typing import Any, List

Expand All @@ -36,7 +37,11 @@

import zenml
from zenml.analytics import source_context
from zenml.constants import API, HEALTH
from zenml.constants import (
API,
DEFAULT_ZENML_SERVER_REPORT_USER_ACTIVITY_TO_DB_SECONDS,
HEALTH,
)
from zenml.enums import AuthScheme, SourceContextTypes
from zenml.zen_server.exceptions import error_detail
from zenml.zen_server.routers import (
Expand Down Expand Up @@ -80,8 +85,10 @@
initialize_secure_headers,
initialize_workload_manager,
initialize_zen_store,
is_user_request,
secure_headers,
server_config,
zen_store,
)

if server_config().use_legacy_dashboard:
Expand Down Expand Up @@ -109,6 +116,12 @@ def relative_path(rel: str) -> str:
default_response_class=ORJSONResponse,
)

# Initialize last_user_activity
last_user_activity: datetime = datetime.now(timezone.utc)
last_user_activity_reported: datetime = datetime.now(timezone.utc) + timedelta(
seconds=-DEFAULT_ZENML_SERVER_REPORT_USER_ACTIVITY_TO_DB_SECONDS
)


# Customize the default request validation handler that comes with FastAPI
# to return a JSON response that matches the ZenML API spec.
Expand Down Expand Up @@ -159,6 +172,44 @@ async def set_secure_headers(request: Request, call_next: Any) -> Any:
return response


@app.middleware("http")
async def track_last_user_activity(request: Request, call_next: Any) -> Any:
"""A middleware to track last user activity.
This middleware checks if the incoming request is a user request and
updates the last activity timestamp if it is.
Args:
request: The incoming request object.
call_next: A function that will receive the request as a parameter and
pass it to the corresponding path operation.
Returns:
The response to the request.
"""
global last_user_activity
global last_user_activity_reported

try:
if is_user_request(request):
last_user_activity = datetime.now(timezone.utc)
except Exception as e:
logger.debug(
f"An unexpected error occurred while checking user activity: {e}"
)
if (
(
datetime.now(timezone.utc) - last_user_activity_reported
).total_seconds()
> DEFAULT_ZENML_SERVER_REPORT_USER_ACTIVITY_TO_DB_SECONDS
):
last_user_activity_reported = datetime.now(timezone.utc)
zen_store()._update_last_user_activity_timestamp(
last_user_activity=last_user_activity
)
return await call_next(request)


@app.middleware("http")
async def infer_source_context(request: Request, call_next: Any) -> Any:
"""A middleware to track the source of an event.
Expand Down
8 changes: 7 additions & 1 deletion src/zenml/zen_stores/base_zen_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
DEFAULT_WORKSPACE_NAME,
ENV_ZENML_DEFAULT_WORKSPACE_NAME,
IS_DEBUG_ENV,
ZENML_PRO_CONNECTION_ISSUES_SUSPENDED_PAUSED_TENANT_HINT,
)
from zenml.enums import (
SecretsStoreType,
Expand Down Expand Up @@ -171,9 +172,14 @@ def __init__(
)

except Exception as e:
zenml_pro_extra = ""
if ".zenml.io" in self.url:
zenml_pro_extra = (
ZENML_PRO_CONNECTION_ISSUES_SUSPENDED_PAUSED_TENANT_HINT
)
raise RuntimeError(
f"Error initializing {self.type.value} store with URL "
f"'{self.url}': {str(e)}"
f"'{self.url}': {str(e)}" + zenml_pro_extra
) from e

if not skip_default_registrations:
Expand Down
Loading

0 comments on commit e0a68c3

Please sign in to comment.