diff --git a/docs/book/how-to/build-pipelines/run-an-individual-step.md b/docs/book/how-to/build-pipelines/run-an-individual-step.md new file mode 100644 index 00000000000..a3f6f402891 --- /dev/null +++ b/docs/book/how-to/build-pipelines/run-an-individual-step.md @@ -0,0 +1,71 @@ +# Run an individual step on your stack + +If you want to run just an invividual step on your stack, you can simply call the step +as you would with a normal Python function. ZenML will internally create a pipeline with just your step +and run it on the active stack. + +{% hint style="info" %} +The pipeline run that will happen to execute your step will be `unlisted`, which means it +will not be associated with any pipeline. You can still see it in the "Runs" tab of the +dashboard. +{% endhint %} + +```python +from zenml import step +import pandas as pd +from sklearn.base import ClassifierMixin +from sklearn.svm import SVC + +# Configure the step to use a step operator. If you're not using +# a step operator, you can remove this and the step will run on +# your orchestrator instead. +@step(step_operator="") +def svc_trainer( + X_train: pd.DataFrame, + y_train: pd.Series, + gamma: float = 0.001, +) -> Tuple[ + Annotated[ClassifierMixin, "trained_model"], + Annotated[float, "training_acc"], +]: + """Train a sklearn SVC classifier.""" + + model = SVC(gamma=gamma) + model.fit(X_train.to_numpy(), y_train.to_numpy()) + + train_acc = model.score(X_train.to_numpy(), y_train.to_numpy()) + print(f"Train accuracy: {train_acc}") + + return model, train_acc + + +X_train = pd.DataFrame(...) +y_train = pd.Series(...) + +# Call the step directly. This will internally create a +# pipeline with just this step, which will be executed on +# the active stack. +model, train_acc = svc_trainer(X_train=X_train, y_train=y_train) +``` + +## Run the underlying step function directly + +If you instead want to run your step function without ZenML getting involved, you +can use the `entrypoint(...)` method of a step: + +```python +X_train = pd.DataFrame(...) +y_train = pd.Series(...) + +model, train_acc = svc_trainer.entrypoint(X_train=X_train, y_train=y_train) +``` + +{% hint style="info" %} +If you want to make this the default behavior when calling a step, you +can set the `ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK` environment variable to `True`. +Once you do that, calling `svc_trainer(...)` will simply call the underlying function and +not use your ZenML stack. +{% endhint %} + + +
ZenML Scarf
\ No newline at end of file diff --git a/docs/book/how-to/run-remote-pipelines-from-notebooks/README.md b/docs/book/how-to/run-remote-steps-and-pipelines-from-notebooks/README.md similarity index 67% rename from docs/book/how-to/run-remote-pipelines-from-notebooks/README.md rename to docs/book/how-to/run-remote-steps-and-pipelines-from-notebooks/README.md index 72cd628c5f1..77790e14498 100644 --- a/docs/book/how-to/run-remote-pipelines-from-notebooks/README.md +++ b/docs/book/how-to/run-remote-steps-and-pipelines-from-notebooks/README.md @@ -8,7 +8,7 @@ ZenML steps and pipelines can be defined in a Jupyter notebook and executed remo Learn more about it in the following sections: -
Define steps in notebook cellsdefine-steps-in-notebook-cells.md
+
Limitations of defining steps in notebook cellslimitations-of-defining-steps-in-notebook-cells.md
Run a single step from a notebookrun-a-single-step-from-a-notebook.md
ZenML Scarf
diff --git a/docs/book/how-to/run-remote-pipelines-from-notebooks/define-steps-in-notebook-cells.md b/docs/book/how-to/run-remote-steps-and-pipelines-from-notebooks/limitations-of-defining-steps-in-notebook-cells.md similarity index 94% rename from docs/book/how-to/run-remote-pipelines-from-notebooks/define-steps-in-notebook-cells.md rename to docs/book/how-to/run-remote-steps-and-pipelines-from-notebooks/limitations-of-defining-steps-in-notebook-cells.md index 1cceabe9a9d..8e41510653b 100644 --- a/docs/book/how-to/run-remote-pipelines-from-notebooks/define-steps-in-notebook-cells.md +++ b/docs/book/how-to/run-remote-steps-and-pipelines-from-notebooks/limitations-of-defining-steps-in-notebook-cells.md @@ -1,5 +1,4 @@ - -# Define steps in notebook cells +# Limitations of defining steps in notebook cells If you want to run ZenML steps defined in notebook cells remotely (either with a remote [orchestrator](../../component-guide/orchestrators/orchestrators.md) or [step operator](../../component-guide/step-operators/step-operators.md)), the cells defining your steps must meet the following conditions: - The cell can only contain python code, no Jupyter magic commands or shell commands starting with a `%` or `!`. diff --git a/docs/book/how-to/run-remote-steps-and-pipelines-from-notebooks/run-a-single-step-from-a-notebook.md b/docs/book/how-to/run-remote-steps-and-pipelines-from-notebooks/run-a-single-step-from-a-notebook.md new file mode 100644 index 00000000000..8d0439f9f72 --- /dev/null +++ b/docs/book/how-to/run-remote-steps-and-pipelines-from-notebooks/run-a-single-step-from-a-notebook.md @@ -0,0 +1,52 @@ +# Run a single step from a notebook + +If you want to run just a single step remotely from a notebook, you can simply call the step +as you would with a normal Python function. ZenML will internally create a pipeline with just your step +and run it on the active stack. + +{% hint style="warning" %} +When defining a step that should be run remotely in a notebook, make sure you're +aware of all the [limitations](limitations-of-defining-steps-in-notebook-cells.md) that apply. +{% endhint %} + + +```python +from zenml import step +import pandas as pd +from sklearn.base import ClassifierMixin +from sklearn.svm import SVC + +# Configure the step to use a step operator. If you're not using +# a step operator, you can remove this and the step will run on +# your orchestrator instead. +@step(step_operator="") +def svc_trainer( + X_train: pd.DataFrame, + y_train: pd.Series, + gamma: float = 0.001, +) -> Tuple[ + Annotated[ClassifierMixin, "trained_model"], + Annotated[float, "training_acc"], +]: + """Train a sklearn SVC classifier.""" + + model = SVC(gamma=gamma) + model.fit(X_train.to_numpy(), y_train.to_numpy()) + + train_acc = model.score(X_train.to_numpy(), y_train.to_numpy()) + print(f"Train accuracy: {train_acc}") + + return model, train_acc + + +X_train = pd.DataFrame(...) +y_train = pd.Series(...) + +# Call the step directly. This will internally create a +# pipeline with just this step, which will be executed on +# the active stack. +model, train_acc = svc_trainer(X_train=X_train, y_train=y_train) +``` + + +
ZenML Scarf
\ No newline at end of file diff --git a/docs/book/toc.md b/docs/book/toc.md index 1d61cc818d7..686eb467e25 100644 --- a/docs/book/toc.md +++ b/docs/book/toc.md @@ -84,6 +84,7 @@ * [Use failure/success hooks](how-to/build-pipelines/use-failure-success-hooks.md) * [Hyperparameter tuning](how-to/build-pipelines/hyper-parameter-tuning.md) * [Access secrets in a step](how-to/build-pipelines/access-secrets-in-a-step.md) + * [Run an individual step](how-to/build-pipelines/run-an-individual-step.md) * [Fetching pipelines](how-to/build-pipelines/fetching-pipelines.md) * [Get past pipeline/step runs](how-to/build-pipelines/get-past-pipeline-step-runs.md) * [🚨 Trigger a pipeline](how-to/trigger-pipelines/README.md) @@ -108,6 +109,9 @@ * [Which files are built into the image](how-to/customize-docker-builds/which-files-are-built-into-the-image.md) * [Use code repositories to automate Docker build reuse](how-to/customize-docker-builds/use-code-repositories-to-speed-up-docker-build-times.md) * [Define where an image is built](how-to/customize-docker-builds/define-where-an-image-is-built.md) +* [📔 Run remote pipelines from notebooks](how-to/run-remote-steps-and-pipelines-from-notebooks/README.md) + * [Limitations of defining steps in notebook cells](how-to/run-remote-steps-and-pipelines-from-notebooks/limitations-of-defining-steps-in-notebook-cells.md) + * [Run a single step from a notebook](how-to/run-remote-steps-and-pipelines-from-notebooks/run-a-single-step-from-a-notebook.md) * [⚒️ Manage stacks & components](how-to/stack-deployment/README.md) * [Deploy a cloud stack with ZenML](how-to/stack-deployment/deploy-a-cloud-stack.md) * [Deploy a cloud stack with Terraform](how-to/stack-deployment/deploy-a-cloud-stack-with-terraform.md) @@ -180,8 +184,6 @@ * [🔌 Connect to a server](how-to/connecting-to-zenml/README.md) * [Connect in with your User (interactive)](how-to/connecting-to-zenml/connect-in-with-your-user-interactive.md) * [Connect with a Service Account](how-to/connecting-to-zenml/connect-with-a-service-account.md) -* [📔 Run remote pipelines from notebooks](how-to/run-remote-pipelines-from-notebooks/README.md) - * [Define steps in notebook cells](how-to/run-remote-pipelines-from-notebooks/define-steps-in-notebook-cells.md) * [🔐 Interact with secrets](how-to/interact-with-secrets.md) * [🐞 Debug and solve issues](how-to/debug-and-solve-issues.md) diff --git a/docs/book/user-guide/starter-guide/create-an-ml-pipeline.md b/docs/book/user-guide/starter-guide/create-an-ml-pipeline.md index a662a32754b..3d9d0f60779 100644 --- a/docs/book/user-guide/starter-guide/create-an-ml-pipeline.md +++ b/docs/book/user-guide/starter-guide/create-an-ml-pipeline.md @@ -194,10 +194,10 @@ def svc_trainer( ``` {% hint style="info" %} -If you want to run the step function outside the context of a ZenML pipeline, all you need to do is call the step function outside of a ZenML pipeline. For example: +If you want to run just a single step on your ZenML stack, all you need to do is call the step function outside of a ZenML pipeline. For example: ```python -svc_trainer(X_train=..., y_train=...) +model, train_acc = svc_trainer(X_train=..., y_train=...) ``` {% endhint %} diff --git a/src/zenml/constants.py b/src/zenml/constants.py index cf73ce691f7..85840a1b72a 100644 --- a/src/zenml/constants.py +++ b/src/zenml/constants.py @@ -185,6 +185,9 @@ def handle_int_env_var(var: str, default: int = 0) -> int: f"{ENV_ZENML_SERVER_PREFIX}USE_LEGACY_DASHBOARD" ) ENV_ZENML_SERVER_AUTO_ACTIVATE = f"{ENV_ZENML_SERVER_PREFIX}AUTO_ACTIVATE" +ENV_ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK = ( + "ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK" +) # Logging variables IS_DEBUG_ENV: bool = handle_bool_env_var(ENV_ZENML_DEBUG, default=False) diff --git a/src/zenml/steps/base_step.py b/src/zenml/steps/base_step.py index 13e9930e6fc..0f7bbc42c61 100644 --- a/src/zenml/steps/base_step.py +++ b/src/zenml/steps/base_step.py @@ -38,7 +38,11 @@ from zenml.client_lazy_loader import ClientLazyLoader from zenml.config.retry_config import StepRetryConfig from zenml.config.source import Source -from zenml.constants import STEP_SOURCE_PARAMETER_NAME +from zenml.constants import ( + ENV_ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK, + STEP_SOURCE_PARAMETER_NAME, + handle_bool_env_var, +) from zenml.exceptions import MissingStepParameterError, StepInterfaceError from zenml.logger import get_logger from zenml.materializers.base_materializer import BaseMaterializer @@ -51,6 +55,7 @@ ) from zenml.steps.utils import ( resolve_type_annotation, + run_as_single_step_pipeline, ) from zenml.utils import ( dict_utils, @@ -586,9 +591,16 @@ def __call__( from zenml.new.pipelines.pipeline import Pipeline if not Pipeline.ACTIVE_PIPELINE: - # The step is being called outside the context of a pipeline, - # we simply call the entrypoint - return self.call_entrypoint(*args, **kwargs) + # The step is being called outside the context of a pipeline, either + # run the step function or run it as a single step pipeline on the + # active stack + run_without_stack = handle_bool_env_var( + ENV_ZENML_RUN_SINGLE_STEPS_WITHOUT_STACK, default=False + ) + if run_without_stack: + return self.call_entrypoint(*args, **kwargs) + else: + return run_as_single_step_pipeline(self, *args, **kwargs) ( input_artifacts, diff --git a/src/zenml/steps/utils.py b/src/zenml/steps/utils.py index d0bd1610fce..16fafda1552 100644 --- a/src/zenml/steps/utils.py +++ b/src/zenml/steps/utils.py @@ -18,7 +18,7 @@ import contextlib import inspect import textwrap -from typing import Any, Callable, Dict, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple, Union from uuid import UUID from pydantic import BaseModel @@ -26,12 +26,17 @@ from zenml.artifacts.artifact_config import ArtifactConfig from zenml.client import Client -from zenml.enums import MetadataResourceTypes +from zenml.enums import ExecutionStatus, MetadataResourceTypes +from zenml.exceptions import StepInterfaceError from zenml.logger import get_logger from zenml.metadata.metadata_types import MetadataType from zenml.new.steps.step_context import get_step_context from zenml.steps.step_output import Output -from zenml.utils import source_code_utils, typing_utils +from zenml.utils import settings_utils, source_code_utils, typing_utils + +if TYPE_CHECKING: + from zenml.steps import BaseStep + logger = get_logger(__name__) @@ -464,3 +469,88 @@ def log_step_metadata( resource_id=step_run_id, resource_type=MetadataResourceTypes.STEP_RUN, ) + + +def run_as_single_step_pipeline( + __step: "BaseStep", *args: Any, **kwargs: Any +) -> Any: + """Runs the step as a single step pipeline. + + - All inputs that are not JSON serializable will be uploaded to the + artifact store before the pipeline is being executed. + - All output artifacts of the step will be loaded using the materializer + that was used to store them. + + Args: + *args: Entrypoint function arguments. + **kwargs: Entrypoint function keyword arguments. + + Raises: + RuntimeError: If the step execution failed. + StepInterfaceError: If the arguments to the entrypoint function are + invalid. + + Returns: + The output of the step entrypoint function. + """ + from zenml import ExternalArtifact, pipeline + from zenml.config.base_settings import BaseSettings + from zenml.new.pipelines.run_utils import ( + wait_for_pipeline_run_to_finish, + ) + + logger.info( + "Running single step pipeline to execute step `%s`", __step.name + ) + + try: + validated_arguments = ( + inspect.signature(__step.entrypoint) + .bind(*args, **kwargs) + .arguments + ) + except TypeError as e: + raise StepInterfaceError( + "Invalid step function entrypoint arguments. Check out the " + "error above for more details." + ) from e + + inputs: Dict[str, Any] = {} + for key, value in validated_arguments.items(): + try: + __step.entrypoint_definition.validate_input(key=key, value=value) + inputs[key] = value + except Exception: + inputs[key] = ExternalArtifact(value=value) + + orchestrator = Client().active_stack.orchestrator + + pipeline_settings: Any = {} + if "synchronous" in orchestrator.config.model_fields: + # Make sure the orchestrator runs sync so we stream the logs + key = settings_utils.get_stack_component_setting_key(orchestrator) + pipeline_settings[key] = BaseSettings(synchronous=True) + + @pipeline(name=__step.name, enable_cache=False, settings=pipeline_settings) + def single_step_pipeline() -> None: + __step(**inputs) + + run = single_step_pipeline.with_options(unlisted=True)() + run = wait_for_pipeline_run_to_finish(run.id) + + if run.status != ExecutionStatus.COMPLETED: + raise RuntimeError("Failed to execute step %s.", __step.name) + + # 4. Load output artifacts + step_run = next(iter(run.steps.values())) + outputs = [ + step_run.outputs[output_name].load() + for output_name in step_run.config.outputs.keys() + ] + + if len(outputs) == 0: + return None + elif len(outputs) == 1: + return outputs[0] + else: + return tuple(outputs) diff --git a/tests/unit/materializers/test_materializer_registry.py b/tests/unit/materializers/test_materializer_registry.py index 0ac87cc1569..f3ed2d4a745 100644 --- a/tests/unit/materializers/test_materializer_registry.py +++ b/tests/unit/materializers/test_materializer_registry.py @@ -29,7 +29,7 @@ def some_step() -> MyFloatType: return MyFloatType(3.0) with does_not_raise(): - some_step()() + some_step().call_entrypoint() def test_materializer_with_parameter_with_more_than_one_baseclass(): @@ -47,7 +47,7 @@ def some_step() -> MyFloatType: return MyFloatType(3.0) with does_not_raise(): - some_step()() + some_step().call_entrypoint() class MyFirstType: @@ -107,4 +107,6 @@ def some_step() -> MyConflictingType: return MyConflictingType() with does_not_raise(): - some_step().configure(output_materializers=MyFirstMaterializer)() + some_step().configure( + output_materializers=MyFirstMaterializer + ).call_entrypoint() diff --git a/tests/unit/steps/test_base_step.py b/tests/unit/steps/test_base_step.py index 1b9c3855b3f..e1498a6f5dc 100644 --- a/tests/unit/steps/test_base_step.py +++ b/tests/unit/steps/test_base_step.py @@ -239,31 +239,31 @@ def step_with_step_operator() -> None: def test_call_step_with_args(step_with_two_int_inputs): """Test that a step can be called with args.""" with does_not_raise(): - step_with_two_int_inputs()(1, 2) + step_with_two_int_inputs().call_entrypoint(1, 2) def test_call_step_with_kwargs(step_with_two_int_inputs): """Test that a step can be called with kwargs.""" with does_not_raise(): - step_with_two_int_inputs()(input_1=1, input_2=2) + step_with_two_int_inputs().call_entrypoint(input_1=1, input_2=2) def test_call_step_with_args_and_kwargs(step_with_two_int_inputs): """Test that a step can be called with a mix of args and kwargs.""" with does_not_raise(): - step_with_two_int_inputs()(1, input_2=2) + step_with_two_int_inputs().call_entrypoint(1, input_2=2) def test_call_step_with_too_many_args(step_with_two_int_inputs): """Test that calling a step fails when too many args are passed.""" with pytest.raises(StepInterfaceError): - step_with_two_int_inputs()(1, 2, 3) + step_with_two_int_inputs().call_entrypoint(1, 2, 3) def test_call_step_with_too_many_args_and_kwargs(step_with_two_int_inputs): """Test that calling a step fails when too many args and kwargs are passed.""" with pytest.raises(StepInterfaceError): - step_with_two_int_inputs()(1, input_1=2, input_2=3) + step_with_two_int_inputs().call_entrypoint(1, input_1=2, input_2=3) def test_call_step_with_missing_key(step_with_two_int_inputs): @@ -285,13 +285,15 @@ def test_call_step_with_unexpected_key(step_with_two_int_inputs): def test_call_step_with_wrong_arg_type(step_with_two_int_inputs): """Test that calling a step fails when an arg has a wrong type.""" with pytest.raises(StepInterfaceError): - step_with_two_int_inputs()(1, "not_an_int") + step_with_two_int_inputs().call_entrypoint(1, "not_an_int") def test_call_step_with_wrong_kwarg_type(step_with_two_int_inputs): """Test that calling a step fails when a kwarg has a wrong type.""" with pytest.raises(StepInterfaceError): - step_with_two_int_inputs()(input_1=1, input_2="not_an_int") + step_with_two_int_inputs().call_entrypoint( + input_1=1, input_2="not_an_int" + ) class MyType: @@ -310,7 +312,7 @@ def some_step() -> MyType: return MyType() with does_not_raise(): - some_step()() + some_step().call_entrypoint() def test_step_uses_config_class_default_values_if_no_config_is_passed(): @@ -376,8 +378,8 @@ def my_step() -> None: step_instance = my_step() with does_not_raise(): - step_instance() - step_instance() + step_instance.call_entrypoint() + step_instance.call_entrypoint() @step