Skip to content

Commit

Permalink
Enable running a single step on the active stack (#2942)
Browse files Browse the repository at this point in the history
* POC to run single steps on stack

* Run sync if possible

* Some cleanup

* Rename env variable

* Invert logic to make it easier

* Docs

* Fix tests

* Fix unit tests

* Add note on unlisted pipeline runs

* Update docs/book/how-to/build-pipelines/run-an-individual-step.md

Co-authored-by: Hamza Tahir <[email protected]>

* Apply review comments

---------

Co-authored-by: Hamza Tahir <[email protected]>
  • Loading branch information
schustmi and htahir1 authored Aug 27, 2024
1 parent 25d8023 commit a15a24c
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 27 deletions.
71 changes: 71 additions & 0 deletions docs/book/how-to/build-pipelines/run-an-individual-step.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Run an individual step on your stack

If you want to run just an invividual step on your stack, you can simply call the step
as you would with a normal Python function. ZenML will internally create a pipeline with just your step
and run it on the active stack.

{% hint style="info" %}
The pipeline run that will happen to execute your step will be `unlisted`, which means it
will not be associated with any pipeline. You can still see it in the "Runs" tab of the
dashboard.
{% endhint %}

```python
from zenml import step
import pandas as pd
from sklearn.base import ClassifierMixin
from sklearn.svm import SVC

# Configure the step to use a step operator. If you're not using
# a step operator, you can remove this and the step will run on
# your orchestrator instead.
@step(step_operator="<STEP_OPERATOR_NAME>")
def svc_trainer(
X_train: pd.DataFrame,
y_train: pd.Series,
gamma: float = 0.001,
) -> Tuple[
Annotated[ClassifierMixin, "trained_model"],
Annotated[float, "training_acc"],
]:
"""Train a sklearn SVC classifier."""

model = SVC(gamma=gamma)
model.fit(X_train.to_numpy(), y_train.to_numpy())

train_acc = model.score(X_train.to_numpy(), y_train.to_numpy())
print(f"Train accuracy: {train_acc}")

return model, train_acc


X_train = pd.DataFrame(...)
y_train = pd.Series(...)

# Call the step directly. This will internally create a
# pipeline with just this step, which will be executed on
# the active stack.
model, train_acc = svc_trainer(X_train=X_train, y_train=y_train)
```

## Run the underlying step function directly

If you instead want to run your step function without ZenML getting involved, you
can use the `entrypoint(...)` method of a step:

```python
X_train = pd.DataFrame(...)
y_train = pd.Series(...)

model, train_acc = svc_trainer.entrypoint(X_train=X_train, y_train=y_train)
```

{% hint style="info" %}
If you want to make this the default behavior when calling a step, you
can set the `ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK` environment variable to `True`.
Once you do that, calling `svc_trainer(...)` will simply call the underlying function and
not use your ZenML stack.
{% endhint %}

<!-- For scarf -->
<figure><img alt="ZenML Scarf" referrerpolicy="no-referrer-when-downgrade" src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" /></figure>
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ZenML steps and pipelines can be defined in a Jupyter notebook and executed remo

Learn more about it in the following sections:

<table data-view="cards"><thead><tr><th></th><th></th><th></th><th data-hidden data-card-target data-type="content-ref"></th></tr></thead><tbody><tr><td>Define steps in notebook cells</td><td></td><td></td><td><a href="define-steps-in-notebook-cells.md">define-steps-in-notebook-cells.md</a></td></tr></tbody></table>
<table data-view="cards"><thead><tr><th></th><th></th><th></th><th data-hidden data-card-target data-type="content-ref"></th></tr></thead><tbody><tr><td>Limitations of defining steps in notebook cells</td><td></td><td></td><td><a href="limitations-of-defining-steps-in-notebook-cells.md">limitations-of-defining-steps-in-notebook-cells.md</a></td></tr><tr><td>Run a single step from a notebook</td><td></td><td></td><td><a href="run-a-single-step-from-a-notebook.md">run-a-single-step-from-a-notebook.md</a></td></tr></tbody></table>

<!-- For scarf -->
<figure><img alt="ZenML Scarf" referrerpolicy="no-referrer-when-downgrade" src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" /></figure>
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

# Define steps in notebook cells
# Limitations of defining steps in notebook cells

If you want to run ZenML steps defined in notebook cells remotely (either with a remote [orchestrator](../../component-guide/orchestrators/orchestrators.md) or [step operator](../../component-guide/step-operators/step-operators.md)), the cells defining your steps must meet the following conditions:
- The cell can only contain python code, no Jupyter magic commands or shell commands starting with a `%` or `!`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Run a single step from a notebook

If you want to run just a single step remotely from a notebook, you can simply call the step
as you would with a normal Python function. ZenML will internally create a pipeline with just your step
and run it on the active stack.

{% hint style="warning" %}
When defining a step that should be run remotely in a notebook, make sure you're
aware of all the [limitations](limitations-of-defining-steps-in-notebook-cells.md) that apply.
{% endhint %}


```python
from zenml import step
import pandas as pd
from sklearn.base import ClassifierMixin
from sklearn.svm import SVC

# Configure the step to use a step operator. If you're not using
# a step operator, you can remove this and the step will run on
# your orchestrator instead.
@step(step_operator="<STEP_OPERATOR_NAME>")
def svc_trainer(
X_train: pd.DataFrame,
y_train: pd.Series,
gamma: float = 0.001,
) -> Tuple[
Annotated[ClassifierMixin, "trained_model"],
Annotated[float, "training_acc"],
]:
"""Train a sklearn SVC classifier."""

model = SVC(gamma=gamma)
model.fit(X_train.to_numpy(), y_train.to_numpy())

train_acc = model.score(X_train.to_numpy(), y_train.to_numpy())
print(f"Train accuracy: {train_acc}")

return model, train_acc


X_train = pd.DataFrame(...)
y_train = pd.Series(...)

# Call the step directly. This will internally create a
# pipeline with just this step, which will be executed on
# the active stack.
model, train_acc = svc_trainer(X_train=X_train, y_train=y_train)
```

<!-- For scarf -->
<figure><img alt="ZenML Scarf" referrerpolicy="no-referrer-when-downgrade" src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" /></figure>
6 changes: 4 additions & 2 deletions docs/book/toc.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
* [Use failure/success hooks](how-to/build-pipelines/use-failure-success-hooks.md)
* [Hyperparameter tuning](how-to/build-pipelines/hyper-parameter-tuning.md)
* [Access secrets in a step](how-to/build-pipelines/access-secrets-in-a-step.md)
* [Run an individual step](how-to/build-pipelines/run-an-individual-step.md)
* [Fetching pipelines](how-to/build-pipelines/fetching-pipelines.md)
* [Get past pipeline/step runs](how-to/build-pipelines/get-past-pipeline-step-runs.md)
* [🚨 Trigger a pipeline](how-to/trigger-pipelines/README.md)
Expand All @@ -108,6 +109,9 @@
* [Which files are built into the image](how-to/customize-docker-builds/which-files-are-built-into-the-image.md)
* [Use code repositories to automate Docker build reuse](how-to/customize-docker-builds/use-code-repositories-to-speed-up-docker-build-times.md)
* [Define where an image is built](how-to/customize-docker-builds/define-where-an-image-is-built.md)
* [📔 Run remote pipelines from notebooks](how-to/run-remote-steps-and-pipelines-from-notebooks/README.md)
* [Limitations of defining steps in notebook cells](how-to/run-remote-steps-and-pipelines-from-notebooks/limitations-of-defining-steps-in-notebook-cells.md)
* [Run a single step from a notebook](how-to/run-remote-steps-and-pipelines-from-notebooks/run-a-single-step-from-a-notebook.md)
* [⚒️ Manage stacks & components](how-to/stack-deployment/README.md)
* [Deploy a cloud stack with ZenML](how-to/stack-deployment/deploy-a-cloud-stack.md)
* [Deploy a cloud stack with Terraform](how-to/stack-deployment/deploy-a-cloud-stack-with-terraform.md)
Expand Down Expand Up @@ -180,8 +184,6 @@
* [🔌 Connect to a server](how-to/connecting-to-zenml/README.md)
* [Connect in with your User (interactive)](how-to/connecting-to-zenml/connect-in-with-your-user-interactive.md)
* [Connect with a Service Account](how-to/connecting-to-zenml/connect-with-a-service-account.md)
* [📔 Run remote pipelines from notebooks](how-to/run-remote-pipelines-from-notebooks/README.md)
* [Define steps in notebook cells](how-to/run-remote-pipelines-from-notebooks/define-steps-in-notebook-cells.md)
* [🔐 Interact with secrets](how-to/interact-with-secrets.md)
* [🐞 Debug and solve issues](how-to/debug-and-solve-issues.md)

Expand Down
4 changes: 2 additions & 2 deletions docs/book/user-guide/starter-guide/create-an-ml-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ def svc_trainer(
```

{% hint style="info" %}
If you want to run the step function outside the context of a ZenML pipeline, all you need to do is call the step function outside of a ZenML pipeline. For example:
If you want to run just a single step on your ZenML stack, all you need to do is call the step function outside of a ZenML pipeline. For example:

```python
svc_trainer(X_train=..., y_train=...)
model, train_acc = svc_trainer(X_train=..., y_train=...)
```
{% endhint %}

Expand Down
3 changes: 3 additions & 0 deletions src/zenml/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ def handle_int_env_var(var: str, default: int = 0) -> int:
f"{ENV_ZENML_SERVER_PREFIX}USE_LEGACY_DASHBOARD"
)
ENV_ZENML_SERVER_AUTO_ACTIVATE = f"{ENV_ZENML_SERVER_PREFIX}AUTO_ACTIVATE"
ENV_ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK = (
"ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK"
)

# Logging variables
IS_DEBUG_ENV: bool = handle_bool_env_var(ENV_ZENML_DEBUG, default=False)
Expand Down
20 changes: 16 additions & 4 deletions src/zenml/steps/base_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@
from zenml.client_lazy_loader import ClientLazyLoader
from zenml.config.retry_config import StepRetryConfig
from zenml.config.source import Source
from zenml.constants import STEP_SOURCE_PARAMETER_NAME
from zenml.constants import (
ENV_ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK,
STEP_SOURCE_PARAMETER_NAME,
handle_bool_env_var,
)
from zenml.exceptions import MissingStepParameterError, StepInterfaceError
from zenml.logger import get_logger
from zenml.materializers.base_materializer import BaseMaterializer
Expand All @@ -52,6 +56,7 @@
)
from zenml.steps.utils import (
resolve_type_annotation,
run_as_single_step_pipeline,
)
from zenml.utils import (
dict_utils,
Expand Down Expand Up @@ -587,9 +592,16 @@ def __call__(
from zenml.new.pipelines.pipeline import Pipeline

if not Pipeline.ACTIVE_PIPELINE:
# The step is being called outside the context of a pipeline,
# we simply call the entrypoint
return self.call_entrypoint(*args, **kwargs)
# The step is being called outside the context of a pipeline, either
# run the step function or run it as a single step pipeline on the
# active stack
run_without_stack = handle_bool_env_var(
ENV_ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK, default=False
)
if run_without_stack:
return self.call_entrypoint(*args, **kwargs)
else:
return run_as_single_step_pipeline(self, *args, **kwargs)

(
input_artifacts,
Expand Down
96 changes: 93 additions & 3 deletions src/zenml/steps/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@
import contextlib
import inspect
import textwrap
from typing import Any, Callable, Dict, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple, Union
from uuid import UUID

from pydantic import BaseModel
from typing_extensions import Annotated

from zenml.artifacts.artifact_config import ArtifactConfig
from zenml.client import Client
from zenml.enums import MetadataResourceTypes
from zenml.enums import ExecutionStatus, MetadataResourceTypes
from zenml.exceptions import StepInterfaceError
from zenml.logger import get_logger
from zenml.metadata.metadata_types import MetadataType
from zenml.new.steps.step_context import get_step_context
from zenml.steps.step_output import Output
from zenml.utils import source_code_utils, typing_utils
from zenml.utils import settings_utils, source_code_utils, typing_utils

if TYPE_CHECKING:
from zenml.steps import BaseStep


logger = get_logger(__name__)

Expand Down Expand Up @@ -486,3 +491,88 @@ def log_step_metadata(
resource_id=step_run_id,
resource_type=MetadataResourceTypes.STEP_RUN,
)


def run_as_single_step_pipeline(
__step: "BaseStep", *args: Any, **kwargs: Any
) -> Any:
"""Runs the step as a single step pipeline.
- All inputs that are not JSON serializable will be uploaded to the
artifact store before the pipeline is being executed.
- All output artifacts of the step will be loaded using the materializer
that was used to store them.
Args:
*args: Entrypoint function arguments.
**kwargs: Entrypoint function keyword arguments.
Raises:
RuntimeError: If the step execution failed.
StepInterfaceError: If the arguments to the entrypoint function are
invalid.
Returns:
The output of the step entrypoint function.
"""
from zenml import ExternalArtifact, pipeline
from zenml.config.base_settings import BaseSettings
from zenml.new.pipelines.run_utils import (
wait_for_pipeline_run_to_finish,
)

logger.info(
"Running single step pipeline to execute step `%s`", __step.name
)

try:
validated_arguments = (
inspect.signature(__step.entrypoint)
.bind(*args, **kwargs)
.arguments
)
except TypeError as e:
raise StepInterfaceError(
"Invalid step function entrypoint arguments. Check out the "
"error above for more details."
) from e

inputs: Dict[str, Any] = {}
for key, value in validated_arguments.items():
try:
__step.entrypoint_definition.validate_input(key=key, value=value)
inputs[key] = value
except Exception:
inputs[key] = ExternalArtifact(value=value)

orchestrator = Client().active_stack.orchestrator

pipeline_settings: Any = {}
if "synchronous" in orchestrator.config.model_fields:
# Make sure the orchestrator runs sync so we stream the logs
key = settings_utils.get_stack_component_setting_key(orchestrator)
pipeline_settings[key] = BaseSettings(synchronous=True)

@pipeline(name=__step.name, enable_cache=False, settings=pipeline_settings)
def single_step_pipeline() -> None:
__step(**inputs)

run = single_step_pipeline.with_options(unlisted=True)()
run = wait_for_pipeline_run_to_finish(run.id)

if run.status != ExecutionStatus.COMPLETED:
raise RuntimeError("Failed to execute step %s.", __step.name)

# 4. Load output artifacts
step_run = next(iter(run.steps.values()))
outputs = [
step_run.outputs[output_name].load()
for output_name in step_run.config.outputs.keys()
]

if len(outputs) == 0:
return None
elif len(outputs) == 1:
return outputs[0]
else:
return tuple(outputs)
8 changes: 5 additions & 3 deletions tests/unit/materializers/test_materializer_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def some_step() -> MyFloatType:
return MyFloatType(3.0)

with does_not_raise():
some_step()()
some_step().call_entrypoint()


def test_materializer_with_parameter_with_more_than_one_baseclass():
Expand All @@ -47,7 +47,7 @@ def some_step() -> MyFloatType:
return MyFloatType(3.0)

with does_not_raise():
some_step()()
some_step().call_entrypoint()


class MyFirstType:
Expand Down Expand Up @@ -107,4 +107,6 @@ def some_step() -> MyConflictingType:
return MyConflictingType()

with does_not_raise():
some_step().configure(output_materializers=MyFirstMaterializer)()
some_step().configure(
output_materializers=MyFirstMaterializer
).call_entrypoint()
Loading

0 comments on commit a15a24c

Please sign in to comment.