Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable running a single step on the active stack #2942

Merged
merged 12 commits into from
Aug 27, 2024
71 changes: 71 additions & 0 deletions docs/book/how-to/build-pipelines/run-an-individual-step.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Run an individual step on your stack

If you want to run just an invividual step on your stack, you can simply call the step
as you would with a normal Python function. ZenML will internally create a pipeline with just your step
and run it on the active stack.

{% hint style="info" %}
The pipeline run that will happen to execute your step will be `unlisted`, which means it
will not be associated with any pipeline. You can still see it in the "Runs" tab of the
dashboard.
{% endhint %}

```python
from zenml import step
import pandas as pd
from sklearn.base import ClassifierMixin
from sklearn.svm import SVC

# Configure the step to use a step operator. If you're not using
# a step operator, you can remove this and the step will run on
# your orchestrator instead.
@step(step_operator="<STEP_OPERATOR_NAME>")
def svc_trainer(
X_train: pd.DataFrame,
y_train: pd.Series,
gamma: float = 0.001,
) -> Tuple[
Annotated[ClassifierMixin, "trained_model"],
Annotated[float, "training_acc"],
]:
"""Train a sklearn SVC classifier."""

model = SVC(gamma=gamma)
model.fit(X_train.to_numpy(), y_train.to_numpy())

train_acc = model.score(X_train.to_numpy(), y_train.to_numpy())
print(f"Train accuracy: {train_acc}")

return model, train_acc


X_train = pd.DataFrame(...)
y_train = pd.Series(...)

# Call the step directly. This will internally create a
# pipeline with just this step, which will be executed on
# the active stack.
model, train_acc = svc_trainer(X_train=X_train, y_train=y_train)
```

## Run the underlying step function directly

If you instead want to run your step function without ZenML getting involved, you
can use the `entrypoint(...)` method of a step:

```python
X_train = pd.DataFrame(...)
y_train = pd.Series(...)

model, train_acc = svc_trainer.entrypoint(X_train=X_train, y_train=y_train)
```

{% hint style="info" %}
If you want to make this the default behavior when calling a step, you
can set the `ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK` environment variable to `True`.
Once you do that, calling `svc_trainer(...)` will simply call the underlying function and
not use your ZenML stack.
{% endhint %}

<!-- For scarf -->
<figure><img alt="ZenML Scarf" referrerpolicy="no-referrer-when-downgrade" src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" /></figure>
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ZenML steps and pipelines can be defined in a Jupyter notebook and executed remo

Learn more about it in the following sections:

<table data-view="cards"><thead><tr><th></th><th></th><th></th><th data-hidden data-card-target data-type="content-ref"></th></tr></thead><tbody><tr><td>Define steps in notebook cells</td><td></td><td></td><td><a href="define-steps-in-notebook-cells.md">define-steps-in-notebook-cells.md</a></td></tr></tbody></table>
<table data-view="cards"><thead><tr><th></th><th></th><th></th><th data-hidden data-card-target data-type="content-ref"></th></tr></thead><tbody><tr><td>Limitations of defining steps in notebook cells</td><td></td><td></td><td><a href="limitations-of-defining-steps-in-notebook-cells.md">limitations-of-defining-steps-in-notebook-cells.md</a></td></tr><tr><td>Run a single step from a notebook</td><td></td><td></td><td><a href="run-a-single-step-from-a-notebook.md">run-a-single-step-from-a-notebook.md</a></td></tr></tbody></table>

<!-- For scarf -->
<figure><img alt="ZenML Scarf" referrerpolicy="no-referrer-when-downgrade" src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" /></figure>
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

# Define steps in notebook cells
# Limitations of defining steps in notebook cells

If you want to run ZenML steps defined in notebook cells remotely (either with a remote [orchestrator](../../component-guide/orchestrators/orchestrators.md) or [step operator](../../component-guide/step-operators/step-operators.md)), the cells defining your steps must meet the following conditions:
- The cell can only contain python code, no Jupyter magic commands or shell commands starting with a `%` or `!`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Run a single step from a notebook

If you want to run just a single step remotely from a notebook, you can simply call the step
as you would with a normal Python function. ZenML will internally create a pipeline with just your step
and run it on the active stack.

{% hint style="warning" %}
When defining a step that should be run remotely in a notebook, make sure you're
aware of all the [limitations](limitations-of-defining-steps-in-notebook-cells.md) that apply.
{% endhint %}


```python
from zenml import step
import pandas as pd
from sklearn.base import ClassifierMixin
from sklearn.svm import SVC

# Configure the step to use a step operator. If you're not using
# a step operator, you can remove this and the step will run on
# your orchestrator instead.
@step(step_operator="<STEP_OPERATOR_NAME>")
def svc_trainer(
X_train: pd.DataFrame,
y_train: pd.Series,
gamma: float = 0.001,
) -> Tuple[
Annotated[ClassifierMixin, "trained_model"],
Annotated[float, "training_acc"],
]:
"""Train a sklearn SVC classifier."""

model = SVC(gamma=gamma)
model.fit(X_train.to_numpy(), y_train.to_numpy())

train_acc = model.score(X_train.to_numpy(), y_train.to_numpy())
print(f"Train accuracy: {train_acc}")

return model, train_acc


X_train = pd.DataFrame(...)
y_train = pd.Series(...)

# Call the step directly. This will internally create a
# pipeline with just this step, which will be executed on
# the active stack.
model, train_acc = svc_trainer(X_train=X_train, y_train=y_train)
```

<!-- For scarf -->
<figure><img alt="ZenML Scarf" referrerpolicy="no-referrer-when-downgrade" src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" /></figure>
6 changes: 4 additions & 2 deletions docs/book/toc.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
* [Use failure/success hooks](how-to/build-pipelines/use-failure-success-hooks.md)
* [Hyperparameter tuning](how-to/build-pipelines/hyper-parameter-tuning.md)
* [Access secrets in a step](how-to/build-pipelines/access-secrets-in-a-step.md)
* [Run an individual step](how-to/build-pipelines/run-an-individual-step.md)
* [Fetching pipelines](how-to/build-pipelines/fetching-pipelines.md)
* [Get past pipeline/step runs](how-to/build-pipelines/get-past-pipeline-step-runs.md)
* [🚨 Trigger a pipeline](how-to/trigger-pipelines/README.md)
Expand All @@ -108,6 +109,9 @@
* [Which files are built into the image](how-to/customize-docker-builds/which-files-are-built-into-the-image.md)
* [Use code repositories to automate Docker build reuse](how-to/customize-docker-builds/use-code-repositories-to-speed-up-docker-build-times.md)
* [Define where an image is built](how-to/customize-docker-builds/define-where-an-image-is-built.md)
* [📔 Run remote pipelines from notebooks](how-to/run-remote-steps-and-pipelines-from-notebooks/README.md)
* [Limitations of defining steps in notebook cells](how-to/run-remote-steps-and-pipelines-from-notebooks/limitations-of-defining-steps-in-notebook-cells.md)
* [Run a single step from a notebook](how-to/run-remote-steps-and-pipelines-from-notebooks/run-a-single-step-from-a-notebook.md)
* [⚒️ Manage stacks & components](how-to/stack-deployment/README.md)
* [Deploy a cloud stack with ZenML](how-to/stack-deployment/deploy-a-cloud-stack.md)
* [Deploy a cloud stack with Terraform](how-to/stack-deployment/deploy-a-cloud-stack-with-terraform.md)
Expand Down Expand Up @@ -180,8 +184,6 @@
* [🔌 Connect to a server](how-to/connecting-to-zenml/README.md)
* [Connect in with your User (interactive)](how-to/connecting-to-zenml/connect-in-with-your-user-interactive.md)
* [Connect with a Service Account](how-to/connecting-to-zenml/connect-with-a-service-account.md)
* [📔 Run remote pipelines from notebooks](how-to/run-remote-pipelines-from-notebooks/README.md)
* [Define steps in notebook cells](how-to/run-remote-pipelines-from-notebooks/define-steps-in-notebook-cells.md)
* [🔐 Interact with secrets](how-to/interact-with-secrets.md)
* [🐞 Debug and solve issues](how-to/debug-and-solve-issues.md)

Expand Down
4 changes: 2 additions & 2 deletions docs/book/user-guide/starter-guide/create-an-ml-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ def svc_trainer(
```

{% hint style="info" %}
If you want to run the step function outside the context of a ZenML pipeline, all you need to do is call the step function outside of a ZenML pipeline. For example:
If you want to run just a single step on your ZenML stack, all you need to do is call the step function outside of a ZenML pipeline. For example:
schustmi marked this conversation as resolved.
Show resolved Hide resolved

```python
svc_trainer(X_train=..., y_train=...)
model, train_acc = svc_trainer(X_train=..., y_train=...)
```
{% endhint %}

Expand Down
3 changes: 3 additions & 0 deletions src/zenml/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ def handle_int_env_var(var: str, default: int = 0) -> int:
f"{ENV_ZENML_SERVER_PREFIX}USE_LEGACY_DASHBOARD"
)
ENV_ZENML_SERVER_AUTO_ACTIVATE = f"{ENV_ZENML_SERVER_PREFIX}AUTO_ACTIVATE"
ENV_ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK = (
"ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK"
)

# Logging variables
IS_DEBUG_ENV: bool = handle_bool_env_var(ENV_ZENML_DEBUG, default=False)
Expand Down
20 changes: 16 additions & 4 deletions src/zenml/steps/base_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
from zenml.client_lazy_loader import ClientLazyLoader
from zenml.config.retry_config import StepRetryConfig
from zenml.config.source import Source
from zenml.constants import STEP_SOURCE_PARAMETER_NAME
from zenml.constants import (
ENV_ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK,
STEP_SOURCE_PARAMETER_NAME,
handle_bool_env_var,
)
from zenml.exceptions import MissingStepParameterError, StepInterfaceError
from zenml.logger import get_logger
from zenml.materializers.base_materializer import BaseMaterializer
Expand All @@ -51,6 +55,7 @@
)
from zenml.steps.utils import (
resolve_type_annotation,
run_as_single_step_pipeline,
)
from zenml.utils import (
dict_utils,
Expand Down Expand Up @@ -586,9 +591,16 @@ def __call__(
from zenml.new.pipelines.pipeline import Pipeline

if not Pipeline.ACTIVE_PIPELINE:
# The step is being called outside the context of a pipeline,
# we simply call the entrypoint
return self.call_entrypoint(*args, **kwargs)
# The step is being called outside the context of a pipeline, either
# run the step function or run it as a single step pipeline on the
# active stack
run_without_stack = handle_bool_env_var(
ENV_ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK, default=False
)
if run_without_stack:
return self.call_entrypoint(*args, **kwargs)
else:
return run_as_single_step_pipeline(self, *args, **kwargs)

(
input_artifacts,
Expand Down
96 changes: 93 additions & 3 deletions src/zenml/steps/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@
import contextlib
import inspect
import textwrap
from typing import Any, Callable, Dict, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple, Union
from uuid import UUID

from pydantic import BaseModel
from typing_extensions import Annotated

from zenml.artifacts.artifact_config import ArtifactConfig
from zenml.client import Client
from zenml.enums import MetadataResourceTypes
from zenml.enums import ExecutionStatus, MetadataResourceTypes
from zenml.exceptions import StepInterfaceError
from zenml.logger import get_logger
from zenml.metadata.metadata_types import MetadataType
from zenml.new.steps.step_context import get_step_context
from zenml.steps.step_output import Output
from zenml.utils import source_code_utils, typing_utils
from zenml.utils import settings_utils, source_code_utils, typing_utils

if TYPE_CHECKING:
from zenml.steps import BaseStep


logger = get_logger(__name__)

Expand Down Expand Up @@ -464,3 +469,88 @@ def log_step_metadata(
resource_id=step_run_id,
resource_type=MetadataResourceTypes.STEP_RUN,
)


def run_as_single_step_pipeline(
__step: "BaseStep", *args: Any, **kwargs: Any
) -> Any:
"""Runs the step as a single step pipeline.

- All inputs that are not JSON serializable will be uploaded to the
artifact store before the pipeline is being executed.
- All output artifacts of the step will be loaded using the materializer
that was used to store them.

Args:
*args: Entrypoint function arguments.
**kwargs: Entrypoint function keyword arguments.

Raises:
RuntimeError: If the step execution failed.
StepInterfaceError: If the arguments to the entrypoint function are
invalid.

Returns:
The output of the step entrypoint function.
"""
from zenml import ExternalArtifact, pipeline
from zenml.config.base_settings import BaseSettings
from zenml.new.pipelines.run_utils import (
wait_for_pipeline_run_to_finish,
)

logger.info(
"Running single step pipeline to execute step `%s`", __step.name
)

try:
validated_arguments = (
inspect.signature(__step.entrypoint)
.bind(*args, **kwargs)
.arguments
)
except TypeError as e:
raise StepInterfaceError(
"Invalid step function entrypoint arguments. Check out the "
"error above for more details."
) from e

inputs: Dict[str, Any] = {}
for key, value in validated_arguments.items():
try:
__step.entrypoint_definition.validate_input(key=key, value=value)
inputs[key] = value
except Exception:
inputs[key] = ExternalArtifact(value=value)

orchestrator = Client().active_stack.orchestrator

pipeline_settings: Any = {}
if "synchronous" in orchestrator.config.model_fields:
# Make sure the orchestrator runs sync so we stream the logs
key = settings_utils.get_stack_component_setting_key(orchestrator)
pipeline_settings[key] = BaseSettings(synchronous=True)

@pipeline(name=__step.name, enable_cache=False, settings=pipeline_settings)
def single_step_pipeline() -> None:
schustmi marked this conversation as resolved.
Show resolved Hide resolved
__step(**inputs)

run = single_step_pipeline.with_options(unlisted=True)()
run = wait_for_pipeline_run_to_finish(run.id)
avishniakov marked this conversation as resolved.
Show resolved Hide resolved

if run.status != ExecutionStatus.COMPLETED:
raise RuntimeError("Failed to execute step %s.", __step.name)

# 4. Load output artifacts
step_run = next(iter(run.steps.values()))
outputs = [
step_run.outputs[output_name].load()
for output_name in step_run.config.outputs.keys()
]

if len(outputs) == 0:
return None
elif len(outputs) == 1:
return outputs[0]
else:
return tuple(outputs)
8 changes: 5 additions & 3 deletions tests/unit/materializers/test_materializer_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def some_step() -> MyFloatType:
return MyFloatType(3.0)

with does_not_raise():
some_step()()
some_step().call_entrypoint()


def test_materializer_with_parameter_with_more_than_one_baseclass():
Expand All @@ -47,7 +47,7 @@ def some_step() -> MyFloatType:
return MyFloatType(3.0)

with does_not_raise():
some_step()()
some_step().call_entrypoint()


class MyFirstType:
Expand Down Expand Up @@ -107,4 +107,6 @@ def some_step() -> MyConflictingType:
return MyConflictingType()

with does_not_raise():
some_step().configure(output_materializers=MyFirstMaterializer)()
some_step().configure(
output_materializers=MyFirstMaterializer
).call_entrypoint()
Loading
Loading