diff --git a/docs/3.0rc/develop/write-flows.mdx b/docs/3.0rc/develop/write-flows.mdx index eb067e10198d..0e93c4c0aa1c 100644 --- a/docs/3.0rc/develop/write-flows.mdx +++ b/docs/3.0rc/develop/write-flows.mdx @@ -3,13 +3,13 @@ title: Write and run flows description: Learn the basics of defining and running flows. --- -Flows are the most central Prefect object. +Flows are the most central Prefect objects. A flow is a container for workflow logic as code. Flows are defined as Python functions. They can take inputs, perform work, and return an output. -You can turn any function into a Prefect flow by adding the `@flow` decorator to it: +Make any function a Prefect flow by adding the `@flow` decorator to it: ```python from prefect import flow @@ -20,12 +20,13 @@ def my_flow(): return ``` -When a function becomes a flow, its behavior changes, giving it the following capabilities: +When a function becomes a flow, its behavior changes. +Flows have the following capabilities: - All runs of the flow have persistent [state](/3.0rc/develop/manage-states/). Transitions between states are recorded, allowing you to observe and act on flow execution. -- Input arguments can be type validated as workflow parameters. -- Retries can be performed on failure. +- Input arguments can be type validated as workflow [parameters](/#specify-flow-parameters). +- [Retries](/#retries) can be performed on failure. - Timeouts can be enforced to prevent unintentional, long-running workflows. - Metadata about [flow runs](#flow-runs), such as run time and final state, is automatically tracked. - A flow can be [deployed](/3.0rc/deploy/infrastructure-examples/docker/), which exposes an API for interacting with it remotely. @@ -33,7 +34,6 @@ allowing you to observe and act on flow execution. Flows are uniquely identified by name. You can provide a `name` parameter value for the flow: - ```python @flow(name="My Flow") def my_flow(): @@ -42,12 +42,11 @@ def my_flow(): If you don't provide a name, Prefect uses the flow function name. -## Running flows +## Run flows A _flow run_ is a single execution of a flow. -You can create a flow run by calling a flow by its function name, just as you would a normal Python function. -For example, by running a script or importing the function into an interactive session and calling it. +Create a flow run by calling a flow by its function name, just as you would a normal Python function. You can also create a flow run by: @@ -56,19 +55,9 @@ You can also create a flow run by: - Starting a flow run for the deployment through a schedule, the Prefect UI, or the Prefect API However you run your flow, Prefect monitors the flow run, capturing its state for observability. +You can log a [variety of metadata](/3.0rc/develop/logging) about flow runs for monitoring, troubleshooting, and auditing purposes. - -**Logging** - -You can log a [variety of metadata](/3.0rc/develop/logging) about your flow runs for monitoring, troubleshooting, and auditing purposes. - - -### Example - -The script below fetches statistics about the [main Prefect repository](https://github.com/PrefectHQ/prefect). -(Note that [httpx](https://www.python-httpx.org/) is an HTTP client library and a dependency of Prefect.) - -Turn this function into a Prefect flow and run the script: +The example below uses the HTTPX client library to fetch statistics about the [main Prefect repository](https://github.com/PrefectHQ/prefect). ```python repo_info.py import httpx @@ -85,6 +74,7 @@ def get_repo_info(): print(f"Stars 🌠 : {repo['stargazers_count']}") print(f"Forks 🍴 : {repo['forks_count']}") + if __name__ == "__main__": get_repo_info() ``` @@ -99,21 +89,15 @@ Forks 🍴 : 1245 12:47:45.008 | INFO | Flow run 'ludicrous-warthog' - Finished in state Completed() ``` - -**Flows can contain arbitrary Python code** - -As shown above, flow definitions can contain arbitrary Python code. - - -## Specifying parameters +## Specify flow parameters As with any Python function, you can pass arguments to a flow, including both positional and keyword arguments. These arguments defined on your flow function are called [parameters](/3.0rc/develop/write-flows/#parameters). They are stored by the Prefect orchestration engine on the flow run object. Prefect automatically performs type conversion of inputs using any provided type hints. -Type hints provide an easy way to enforce typing on your flow parameters and can be customized with [Pydantic](https://pydantic-docs.helpmanual.io/). -Prefect supports _any_ Pydantic model as a type hint for a flow parameter. +Type hints provide a simple way to enforce typing on your flow parameters and can be customized with [Pydantic](https://pydantic-docs.helpmanual.io/). +Prefect supports any Pydantic model as a type hint for a flow parameter. ```python from prefect import flow @@ -125,23 +109,26 @@ class Model(BaseModel): b: float c: str + @flow def model_validator(model: Model): print(model) ``` -For example, to automatically convert an argument to a datetime: +For example, to automatically convert an argument to a datetime object: ```python from prefect import flow from datetime import datetime + @flow def what_day_is_it(date: Optional[datetime] = None): if date is None: date = datetime.now(timezone.utc) print(f"It was {date.strftime('%A')} on {date.isoformat()}") + if __name__ == "__main__": what_day_is_it("2021-01-01T02:00:19.180906") ``` @@ -152,8 +139,8 @@ When you run this flow, you'll see the following output: It was Friday on 2021-01-01T02:00:19.180906 ``` -Note that you can provide parameter values to a flow through the API using a [deployment](/3.0rc/deploy/infrastructure-examples/docker/). -Flow run parameters sent to the API on flow calls are coerced to the appropriate types. +Note that you can provide parameter values to a flow through the API using a [deployment](/3.0rc/deploy/). +Flow run parameters sent to the API are coerced to the appropriate types when possible. **Prefect API requires keyword arguments** @@ -163,14 +150,13 @@ The values passed cannot be positional. Parameters are validated before a flow is run. -If a flow call receives invalid parameters, a flow run is created in a `Failed` state. If a flow run for a deployment receives invalid parameters, it moves from a `Pending` state to a `Failed` state without entering a `Running` state. Flow run parameters cannot exceed `512kb` in size. -## Composing flows +## Compose flows Flows can call [tasks](/3.0rc/develop/write-tasks), the most granular units of orchestrated work in Prefect workflows: @@ -182,6 +168,7 @@ from prefect import flow, task def print_hello(name): print(f"Hello {name}!") + @flow(name="Hello Flow") def hello_world(name="world"): print_hello(name) @@ -189,37 +176,27 @@ def hello_world(name="world"): A single flow function can contain all of your workflow's code. However, if you put all of your workflow logic in a single flow function and any line of code fails, the entire flow fails and must be retried from the beginning. -Organizing your workflow code into smaller flows and tasks lets you take advantage of Prefect features such as retries, -more granular visibility into runtime state, the ability to determine final state regardless of individual task state, and more. +The more granular you make your workflows, the better they can recover from failures and the easier you can find and fix issues. -You may call any number of tasks, other flows, and even regular Python functions within your flow. -You can pass parameters to your flow function to use elsewhere in the workflow, and Prefect will report on the progress -and [final state](#final-state-determination) of any invocation. - -We recommend writing atomic tasks. -Each task should be a single, discrete piece of work in your workflow, such as calling an API, performing a database operation, or transforming a data point. -The more granular you make your tasks, the better your workflows can recover from failures and the easier you can find and fix issues. Prefect tasks are well suited for parallel or distributed execution using distributed computation frameworks such as Dask or Ray. -### Nesting flows +### Nest flows -In addition to calling tasks within a flow, you can also call other flows. -A _nested_ flow run is created when a flow function is called by another flow. +In addition to calling tasks from a flow, flows can also call other flows. +A nested flow run is created when a flow function is called by another flow. When one flow calls another, the calling flow run is the "parent" run, the called flow run is the "child" run. -Nesting flows is a great way to organize your workflows and offer more visibility within the UI. In the UI, each child flow run is linked to its parent and can be individually observed. -For most purposes, nested flow runs behave just as all flow runs do. +For most purposes, nested flow runs behave just like unnested flow runs. There is a full representation of the nested flow run in the backend as if it had been called separately. Nested flow runs differ from normal flow runs in that they resolve any passed task futures into data. -This allows data to be passed from the parent flow run to the child run easily. +This allows data to be passed from the parent flow run to a nested flow run easily. When a nested flow run starts, it creates a new [task runner](/3.0rc/develop/task-runners/) for any tasks it contains. When the nested flow run completes, the task runner shuts down. Nested flow runs block execution of the parent flow run until completion. -However, asynchronous nested flows can run concurrently with [AnyIO task groups](https://anyio.readthedocs.io/en/stable/tasks.html) or -[asyncio.gather](https://docs.python.org/3/library/asyncio-task.html#id6). +However, asynchronous nested flows can run concurrently with [AnyIO task groups](https://anyio.readthedocs.io/en/stable/tasks.html) or [asyncio.gather](https://docs.python.org/3/library/asyncio-task.html#id6). The relationship between nested runs is recorded through a special task run in the parent flow run that represents the child flow run. The `state_details` field of the task run representing the child flow run includes a `child_flow_run_id`. @@ -229,7 +206,7 @@ You can define multiple flows within the same file. Whether running locally or through a [deployment](/3.0rc/deploy/infrastructure-examples/docker/), you must indicate which flow is the entrypoint for a flow run. -**Cancelling nested flow runs** +**Cancel nested flow runs** A nested flow run cannot be cancelled without cancelling its parent flow run. If you need to be able to cancel a nested flow run independent of its parent flow run, we recommend deploying it separately and starting it with @@ -242,16 +219,16 @@ You can also define flows or tasks in separate modules and import them for use: from prefect import flow, task -@flow(name="Subflow") -def my_subflow(msg): - print(f"Subflow says: {msg}") +@flow(name="Nestedflow") +def my_nested_flow(msg): + print(f"Nestedflow says: {msg}") ``` -Here's a parent flow that imports and uses `my_subflow()` as a subflow: +Here's a parent flow that imports and uses `my_nested_flow` as a nested flow: -```python +```python hello.py from prefect import flow, task -from subflow import my_subflow +from nested_flow import my_nested_flow @task(name="Print Hello") @@ -260,30 +237,33 @@ def print_hello(name): print(msg) return msg + @flow(name="Hello Flow") def hello_world(name="world"): message = print_hello(name) - my_subflow(message) + my_nested_flow(message) + if __name__=="__main__": hello_world("Marvin") ``` -Running the `hello_world()` flow (in this example from the file `hello.py`) creates a flow run like this: +Running the `hello_world()` flow creates a flow run like this: ```bash -$ python hello.py -15:19:21.651 | INFO | prefect.engine - Created flow run 'daft-cougar' for flow 'Hello Flow' -15:19:21.945 | INFO | Flow run 'daft-cougar' - Created task run 'Print Hello-84f0fe0e-0' for task 'Print Hello' +08:24:06.617 | INFO | prefect.engine - Created flow run 'sage-mongoose' for flow 'Hello Flow' +08:24:06.620 | INFO | prefect.engine - View at https://app.prefect.cloud/... +08:24:07.113 | INFO | Task run 'Print Hello-0' - Created task run 'Print Hello-0' for task 'Print Hello' Hello Marvin! -15:19:22.055 | INFO | Task run 'Print Hello-84f0fe0e-0' - Finished in state Completed() -15:19:22.107 | INFO | Flow run 'daft-cougar' - Created subflow run 'ninja-duck' for flow 'Subflow' -Subflow says: Hello Marvin! -15:19:22.794 | INFO | Flow run 'ninja-duck' - Finished in state Completed() -15:19:23.215 | INFO | Flow run 'daft-cougar' - Finished in state Completed('All states completed.') +08:24:07.445 | INFO | Task run 'Print Hello-0' - Finished in state Completed() +08:24:07.825 | INFO | Flow run 'sage-mongoose' - Created subflow run 'powerful-capybara' for flow 'Nestedflow' +08:24:07.826 | INFO | prefect.engine - View at https://app.prefect.cloud/... +Nestedflow says: Hello Marvin! +08:24:08.165 | INFO | Flow run 'powerful-capybara' - Finished in state Completed() +08:24:08.296 | INFO | Flow run 'sage-mongoose' - Finished in state Completed() ``` -Here are some scenarios where you should choose a nested flow rather than calling tasks individually: +Here are some scenarios where you might want to define a nested flow rather than call tasks individually: - Observability: Nested flows, like any other flow run, have first-class observability within the Prefect UI and Prefect Cloud. You'll see nested flows' status in the **Runs** dashboard rather than having to dig down into the tasks within a specific flow run. @@ -299,41 +279,28 @@ task runner for each nested flow. ## Supported functions Almost any standard Python function can be turned into a Prefect flow by adding the `@flow` decorator. +Flows are executed in the main thread by default to facilitate native Python debugging and profiling. - -Flows are always executed in the main thread by default to facilitate native Python debugging and profiling. - - -### Synchronous functions - -The simplest Prefect flow is a synchronous Python function. Here's an example of a synchronous flow that prints a message: - -```python -from prefect import flow - -@flow -def print_message(): - print("Hello, I'm a flow") - -print_message() -``` +As shown in the examples above, flows run synchronously by default. ### Asynchronous functions -Prefect also supports asynchronous functions. +Prefect also supports asynchronous execution. The resulting flows are coroutines that can be awaited or run concurrently, following [the standard rules of async Python](https://docs.python.org/3/library/asyncio-task.html). +For example: ```python import asyncio - from prefect import task, flow + @task async def print_values(values): for value in values: await asyncio.sleep(1) print(value, end=" ") + @flow async def async_flow(): print("Hello, I'm an async flow") @@ -345,13 +312,15 @@ async def async_flow(): coros = [print_values("abcd"), print_values("6789")] await asyncio.gather(*coros) -asyncio.run(async_flow()) + +if __name__ == "__main__": + asyncio.run(async_flow()) ``` ### Class methods Prefect supports synchronous and asynchronous class methods as flows, including instance methods, class methods, and static methods. -For class methods and static methods, you must apply the appropriate method decorator _above_ the `@flow` decorator: +For class methods and static methods, apply the appropriate method decorator _above_ the `@flow` decorator: ```python from prefect import flow @@ -363,16 +332,19 @@ class MyClass: def my_instance_method(self): pass + @classmethod @flow def my_class_method(cls): pass + @staticmethod @flow def my_static_method(): pass + MyClass().my_instance_method() MyClass.my_class_method() MyClass.my_static_method() @@ -398,6 +370,7 @@ def generator(): def consumer(x): print(x) + for val in generator(): consumer(val) ``` @@ -414,14 +387,17 @@ Here is an example of proactive generator consumption: ```python from prefect import flow + def gen(): yield from [1, 2, 3] print('Generator consumed!') + @flow def f(): return gen() - + + f() # prints 'Generator consumed!' ``` @@ -431,200 +407,31 @@ Values yielded from generator flows are not considered final results and do not ```python from prefect import flow + def gen(): yield from [1, 2, 3] print('Generator consumed!') + @flow def f(): yield gen - -generator = next(f()) -list(generator) # prints 'Generator consumed!' - -``` - - -## Parameters - -As with any Python function, you can pass arguments to a flow including both positional and keyword arguments. -These arguments defined on your flow function are called [parameters](/3.0rc/develop/write-flows/#parameters). -They are stored by the Prefect orchestration engine on the flow run object. - -Prefect automatically performs type conversion of inputs using any provided type hints. -Type hints provide an easy way to enforce typing on your flow parameters and can be greatly enhanced with [Pydantic](https://pydantic-docs.helpmanual.io/). -Prefect supports _any_ Pydantic model as a type hint within a flow is coerced automatically into the relevant object type: - -```python -from prefect import flow -from pydantic import BaseModel -class Model(BaseModel): - a: int - b: float - c: str -@flow -def model_validator(model: Model): - print(model) -``` - -For example, to automatically convert something to a datetime: - -```python -from prefect import flow -from datetime import datetime - -@flow -def what_day_is_it(date: Optional[datetime] = None): - if date is None: - date = datetime.now(timezone.utc) - print(f"It was {date.strftime('%A')} on {date.isoformat()}") - -if __name__ == "__main__": - what_day_is_it("2021-01-01T02:00:19.180906") -``` - -When you run this flow, you'll see the following output: - -```bash -It was Friday on 2021-01-01T02:00:19.180906 +generator = next(f()) +list(generator) # prints 'Generator consumed!' ``` - -Note that you can provide parameter values to a flow through the API using a [deployment](/3.0rc/deploy/infrastructure-examples/docker/). -Flow run parameters sent to the API on flow calls are coerced to the appropriate types. - - -**Prefect API requires keyword arguments** - -When creating flow runs from the Prefect API, you must specify parameter names when overriding defaults. -They cannot be positional. -Parameters are validated before a flow is run. -If a flow call receives invalid parameters, a flow run is created in a `Failed` state. -If a flow run for a deployment receives invalid parameters, it moves from a `Pending` state to `Failed` without entering a `Running` state. - - -Flow run parameters cannot exceed `512kb` in size. - ## Flow runs -A _flow run_ represents a single execution of the flow. - -You can create a flow run by calling the flow manually. -For example, by running a Python script or importing the flow into an interactive session and calling it. - -You can also create a flow run by: - -- Using external schedulers such as `cron` to invoke a flow function -- Creating a [deployment](/3.0rc/deploy/infrastructure-examples/docker/) on Prefect Cloud or a locally run Prefect server -- Creating a flow run for the deployment through a schedule, the Prefect UI, or the Prefect API - -However you run the flow, the Prefect API monitors the flow run, capturing flow run state for observability. - - -**Logging** - -Prefect enables you to log a variety of useful information about your flow and task runs. -You can capture information about your workflows for purposes such as monitoring, troubleshooting, and auditing. -Check out [Logging](/3.0rc/develop/logging) for more information. - - -When you run a flow that contains tasks or additional flows, Prefect tracks the relationship of each child run to the parent flow run. - - -**Retries** - -Unexpected errors may occur. For example the GitHub API may be temporarily unavailable or rate limited. -Check out [Transactions](/3.0rc/develop/transactions) to learn how to make your flows more resilient. - - -## Writing flows - -The `@flow` decorator is used to designate a flow: - -```python -from prefect import flow - -@flow -def my_flow(): - return -``` - -There are no rigid rules for what code you include within a flow definition. All valid Python is acceptable. - -Flows are uniquely identified by name. You can provide a `name` parameter value for the flow. -If you don't provide a name, Prefect uses the flow function name. - -```python -@flow(name="My Flow") -def my_flow(): - return -``` - -Flows can call tasks to allow Prefect to orchestrate and track more granular units of work: - -```python -from prefect import flow, task - -@task -def print_hello(name): - print(f"Hello {name}!") - -@flow(name="Hello Flow") -def hello_world(name="world"): - print_hello(name) -``` - - -**Flows and tasks** - -There's nothing stopping you from putting all of your code in a single flow function. - -However, organizing your workflow code into smaller flow and task units lets you take advantage of Prefect features like retries, -more granular visibility into runtime state, the ability to determine final state regardless of individual task state, and more. - -In addition, if you put all of your workflow logic in a single flow function and any line of code fails, the entire flow fails -and must be retried from the beginning. -You can avoid this by breaking up the code into multiple tasks. - -You may call any number of other tasks, subflows, and even regular Python functions within your flow. -You can pass parameters to your flow function to use elsewhere in the workflow, and Prefect will report on the progress -and [final state](#final-state-determination) of any invocation. - -Prefect encourages "small tasks." Each one should represent a single logical step of your workflow. -This allows Prefect to better contain task failures. - - -## Subflows - -In addition to calling tasks within a flow, you can also call other flows. -Child flows are called [subflows](/3.0rc/develop/write-flows/#composing-flows) and allow you to efficiently manage, -track, and version common multi-task logic. - -Subflows are a great way to organize your workflows and offer more visibility within the UI. - -Add a `flow` decorator to the `get_open_issues` function: +A _flow run_ is a single execution of a flow. -```python -@flow -def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100): - issues = [] - pages = range(1, -(open_issues_count // -per_page) + 1) - for page in pages: - issues.append( - get_url.submit( - f"https://api.github.com/repos/{repo_name}/issues", - params={"page": page, "per_page": per_page, "state": "open"}, - ) - ) - return [i for p in issues for i in p.result()] -``` +You can create a flow run by calling the flow function manually, or even by using an external scheduler such as `cron` to invoke a flow function. +Most users run flows by creating a [deployment](/3.0rc/deploy/) on Prefect Cloud or Prefect server and then scheduling a flow run for the deployment through a schedule, the Prefect UI, or the Prefect API. -Whenever you run the parent flow, the subflow is called and runs. -In the UI, each subflow run is linked to its parent and can be individually inspected. +However you run a flow, the Prefect API monitors the flow run and records information for monitoring, troubleshooting, and auditing. ## Flow settings @@ -642,67 +449,61 @@ Flows can be configured by passing arguments to the decorator. Flows accept the | `validate_parameters` | Boolean indicating whether parameters passed to flows are validated by Pydantic. Default is `True`. | | `version` | An optional version string for the flow. If not provided, we will attempt to create a version string as a hash of the file containing the wrapped function. If the file cannot be located, the version will be null. | -For example, you can provide a `name` value for the flow. Here is the optional `description` argument -and a non-default task runner. +For example, you can provide `name` and `description` arguments. ```python from prefect import flow + @flow( - name="My Flow", - description="My flow with retries and linear backoff", - retries=3, - retry_delay_seconds=[10, 20, 30], -) + name="My Flow", description="My flow with a name and description", log_prints=True) def my_flow(): - return -``` + print("Hello, I'm a flow") -You can also provide the description as the docstring on the flow function. -```python -@flow( - name="My Flow", - retries=3, - retry_delay_seconds=[10, 20, 30], -) -def my_flow(): - """My flow with retries and linear backoff""" - return +if __name__ == "__main__": + my_flow() ``` -You can distinguish runs of this flow by providing a `flow_run_name`. -This setting accepts a string that can optionally contain templated references to the parameters of your flow. +If no description is provided, a flow function's docstring is used as the description. + +You can distinguish runs of a flow by passing a `flow_run_name`. +This parameter accepts a string that can contain templated references to the parameters of your flow. The name is formatted using Python's standard string formatting syntax: ```python import datetime from prefect import flow + @flow(flow_run_name="{name}-on-{date:%A}") def my_flow(name: str, date: datetime.datetime): pass + # creates a flow run called 'marvin-on-Thursday' -my_flow(name="marvin", date=datetime.datetime.now(datetime.timezone.utc)) +if __name__ == "__main__": + my_flow(name="marvin", date=datetime.datetime.now(datetime.timezone.utc)) ``` -Additionally this setting also accepts a function that returns a string for the flow run name: +This setting also accepts a function that returns a string for the flow run name: ```python import datetime from prefect import flow + def generate_flow_run_name(): date = datetime.datetime.now(datetime.timezone.utc) - return f"{date:%A}-is-a-nice-day" + @flow(flow_run_name=generate_flow_run_name) def my_flow(name: str): pass -# creates a flow run called 'Thursday-is-a-nice-day' + +# creates a flow run named 'Thursday-is-a-nice-day' if __name__ == "__main__": my_flow(name="marvin") ``` @@ -713,6 +514,7 @@ If you need access to information about the flow, use the `prefect.runtime` modu from prefect import flow from prefect.runtime import flow_run + def generate_flow_run_name(): flow_name = flow_run.flow_name @@ -722,105 +524,37 @@ def generate_flow_run_name(): return f"{flow_name}-with-{name}-and-{limit}" + @flow(flow_run_name=generate_flow_run_name) def my_flow(name: str, limit: int = 100): pass -# creates a flow run called 'my-flow-with-marvin-and-100' + +# creates a flow run named 'my-flow-with-marvin-and-100' if __name__ == "__main__": my_flow(name="marvin") ``` -Note that `validate_parameters` check that input values conform to the annotated types on the function. -Where possible, values are coerced into the correct type. For example, if a parameter is defined as `x: int` and "5" is passed, -it resolves to `5`. +Note that `validate_parameters` checks that input values conform to the annotated types on the function. +Where possible, values are coerced into the correct type. +For example, if a parameter is defined as `x: int` and the string **"5"** is passed, it resolves to `5`. If set to `False`, no validation is performed on flow parameters. ## Final state determination -States are a record of the status of a particular task or flow run. See the [manage states](/3.0rc/develop/manage-states) page for more information. +A state is a record of the status of a particular task run or flow run. +See the [manage states](/3.0rc/develop/manage-states) page for more information. -The final state of the flow is determined by its return value. The following rules apply: +The final state of the flow is determined by its return value. +The following rules apply: - If an exception is raised directly in the flow function, the flow run is marked as failed. -- If the flow does not return a value (or returns `None`), its state is determined by the states of all of the tasks and subflows within it. - - If _any_ task run or subflow run failed, then the final flow run state is marked as `FAILED`. - - If _any_ task run was cancelled, then the final flow run state is marked as `CANCELLED`. +- If the flow does not return a value (or returns `None`), its state is determined by the states of all of the tasks and nested flows within it. + - If _any_ task run or nested flow run fails, then the final flow run state is marked as `FAILED`. + - If _any_ task run is cancelled, then the final flow run state is marked as `CANCELLED`. - If a flow returns a manually created state, it is used as the state of the final flow run. This allows for manual determination of final state. - If the flow run returns _any other object_, then it is marked as completed. -The following examples illustrate each of these cases: - -### Raise an exception - -If an exception is raised within the flow function, the flow is immediately marked as failed. - -```python -from prefect import flow - -@flow -def always_fails_flow(): - raise ValueError("This flow immediately fails") - -if __name__ == "__main__": - always_fails_flow() -``` - -Running this flow produces the following result: - -```bash -22:22:36.864 | INFO | prefect.engine - Created flow run 'acrid-tuatara' for flow 'always-fails-flow' -22:22:37.060 | ERROR | Flow run 'acrid-tuatara' - Encountered exception during execution: -Traceback (most recent call last):... -ValueError: This flow immediately fails -``` - -### Return `none` - -A flow with no return statement is determined by the state of all of its task runs. - -```python -from prefect import flow, task - -@task -def always_fails_task(): - raise ValueError("I fail successfully") - -@task -def always_succeeds_task(): - print("I'm fail safe!") - return "success" - -@flow -def always_fails_flow(): - always_fails_task.submit().result(raise_on_failure=False) - always_succeeds_task() - -if __name__ == "__main__": - always_fails_flow() -``` - -Running this flow produces the following result: - -```bash -18:32:05.345 | INFO | prefect.engine - Created flow run 'auburn-lionfish' for flow 'always-fails-flow' -18:32:05.582 | INFO | Flow run 'auburn-lionfish' - Created task run 'always_fails_task-96e4be14-0' for task 'always_fails_task' -18:32:05.582 | INFO | Flow run 'auburn-lionfish' - Submitted task run 'always_fails_task-96e4be14-0' for execution. -18:32:05.610 | ERROR | Task run 'always_fails_task-96e4be14-0' - Encountered exception during execution: -Traceback (most recent call last): - ... -ValueError: I fail successfully -18:32:05.638 | ERROR | Task run 'always_fails_task-96e4be14-0' - Finished in state Failed('Task run encountered an exception.') -18:32:05.658 | INFO | Flow run 'auburn-lionfish' - Created task run 'always_succeeds_task-9c27db32-0' for task 'always_succeeds_task' -18:32:05.659 | INFO | Flow run 'auburn-lionfish' - Executing 'always_succeeds_task-9c27db32-0' immediately... -I'm fail safe! -18:32:05.703 | INFO | Task run 'always_succeeds_task-9c27db32-0' - Finished in state Completed() -18:32:05.730 | ERROR | Flow run 'auburn-lionfish' - Finished in state Failed('1/2 states failed.') -Traceback (most recent call last): - ... -ValueError: I fail successfully -``` - ### Return a future If a flow returns one or more futures, the final state is determined based on the underlying states. @@ -828,26 +562,30 @@ If a flow returns one or more futures, the final state is determined based on th ```python from prefect import flow, task + @task def always_fails_task(): raise ValueError("I fail successfully") + @task def always_succeeds_task(): print("I'm fail safe!") return "success" + @flow def always_succeeds_flow(): x = always_fails_task.submit().result(raise_on_failure=False) y = always_succeeds_task.submit(wait_for=[x]) return y + if __name__ == "__main__": always_succeeds_flow() ``` -Running this flow produces the following result—it succeeds because it returns the future of the task that succeeds: +This flow run finishes in a **Completed** final state because the flow returns the future of the task that succeeds: ```bash 18:35:24.965 | INFO | prefect.engine - Created flow run 'whispering-guan' for flow 'always-succeeds-flow' @@ -873,18 +611,22 @@ then determining if any of the states are not `COMPLETED`. ```python from prefect import task, flow + @task def always_fails_task(): raise ValueError("I am bad task") + @task def always_succeeds_task(): return "foo" + @flow def always_succeeds_flow(): return "bar" + @flow def always_fails_flow(): x = always_fails_task() @@ -893,49 +635,38 @@ def always_fails_flow(): return x, y, z ``` -Running this flow produces the following result. -It fails because one of the three returned futures failed. -Note that the final state is `Failed`, but the states of each of the returned futures is included in the flow state: +Running `always_fails_flow` fails because one of the three returned futures fails. +Note that the states of each of the returned futures are included in the flow run output: ```bash -20:57:51.547 | INFO | prefect.engine - Created flow run 'impartial-gorilla' for flow 'always-fails-flow' -20:57:51.645 | INFO | Flow run 'impartial-gorilla' - Created task run 'always_fails_task-58ea43a6-0' for task 'always_fails_task' -20:57:51.686 | INFO | Flow run 'impartial-gorilla' - Created task run 'always_succeeds_task-c9014725-0' for task 'always_succeeds_task' -20:57:51.727 | ERROR | Task run 'always_fails_task-58ea43a6-0' - Encountered exception during execution: -Traceback (most recent call last):... -ValueError: I am bad task -20:57:51.787 | INFO | Task run 'always_succeeds_task-c9014725-0' - Finished in state Completed() -20:57:51.808 | INFO | Flow run 'impartial-gorilla' - Created subflow run 'unbiased-firefly' for flow 'always-succeeds-flow' -20:57:51.884 | ERROR | Task run 'always_fails_task-58ea43a6-0' - Finished in state Failed('Task run encountered an exception.') +... 20:57:52.438 | INFO | Flow run 'unbiased-firefly' - Finished in state Completed() 20:57:52.811 | ERROR | Flow run 'impartial-gorilla' - Finished in state Failed('1/3 states failed.') Failed(message='1/3 states failed.', type=FAILED, result=(Failed(message='Task run encountered an exception.', type=FAILED, result=ValueError('I am bad task'), task_run_id=5fd4c697-7c4c-440d-8ebc-dd9c5bbf2245), Completed(message=None, type=COMPLETED, result='foo', task_run_id=df9b6256-f8ac-457c-ba69-0638ac9b9367), Completed(message=None, type=COMPLETED, result='bar', task_run_id=cfdbf4f1-dccd-4816-8d0f-128750017d0c)), flow_run_id=6d2ec094-001a-4cb0-a24e-d2051db6318d) ``` - -**Returning multiple states** - -When returning multiple states, they must be contained in a `set`, `list`, or `tuple`. -If using other collection types, the result of the contained states are checked. - +If multiple states are returned, they must be contained in a `set`, `list`, or `tuple`. ### Return a manual state -If a flow returns a manually created state, the final state is determined based on the return value. +If a flow returns a manually created state, the final state is determined based upon the return value. ```python from prefect import task, flow from prefect.states import Completed, Failed + @task def always_fails_task(): raise ValueError("I fail successfully") + @task def always_succeeds_task(): print("I'm fail safe!") return "success" + @flow def always_succeeds_flow(): x = always_fails_task.submit() @@ -945,62 +676,42 @@ def always_succeeds_flow(): else: return Failed(message="How did this happen!?") + if __name__ == "__main__": always_succeeds_flow() ``` -Running this flow produces the following result. +Running this flow produces the following result: ```bash -18:37:42.844 | INFO | prefect.engine - Created flow run 'lavender-elk' for flow 'always-succeeds-flow' -18:37:43.125 | INFO | Flow run 'lavender-elk' - Created task run 'always_fails_task-96e4be14-0' for task 'always_fails_task' -18:37:43.126 | INFO | Flow run 'lavender-elk' - Submitted task run 'always_fails_task-96e4be14-0' for execution. -18:37:43.162 | INFO | Flow run 'lavender-elk' - Created task run 'always_succeeds_task-9c27db32-0' for task 'always_succeeds_task' -18:37:43.163 | INFO | Flow run 'lavender-elk' - Submitted task run 'always_succeeds_task-9c27db32-0' for execution. -18:37:43.175 | ERROR | Task run 'always_fails_task-96e4be14-0' - Encountered exception during execution: -Traceback (most recent call last): - ... +... ValueError: I fail successfully +07:29:34.754 | INFO | Task run 'always_succeeds_task-0' - Created task run 'always_succeeds_task-0' for task 'always_succeeds_task' +07:29:34.848 | ERROR | Task run 'always_fails_task-0' - Finished in state Failed('Task run encountered an exception ValueError: I fail successfully') I'm fail safe! -18:37:43.217 | ERROR | Task run 'always_fails_task-96e4be14-0' - Finished in state Failed('Task run encountered an exception.') -18:37:43.236 | INFO | Task run 'always_succeeds_task-9c27db32-0' - Finished in state Completed() -18:37:43.264 | INFO | Flow run 'lavender-elk' - Finished in state Completed('I am happy with this result') +07:29:35.086 | INFO | Task run 'always_succeeds_task-0' - Finished in state Completed() +07:29:35.225 | INFO | Flow run 'hidden-butterfly' - Finished in state Completed('I am happy with this result') ``` -### Return an object +If a flow run returns any other object, then it is recorded as `COMPLETED` -If the flow run returns _any other object_, then it is marked as completed. +## Retries -```python -from prefect import task, flow +Unexpected errors may occur in workflows. +For example the GitHub API may be temporarily unavailable or rate limited. -@task -def always_fails_task(): - raise ValueError("I fail successfully") +Prefect can automatically retry flow runs on failure. -@flow -def always_succeeds_flow(): - always_fails_task().submit() - return "foo" +To enable retries, pass an integer to the flow's `retries` parameter. +If the flow run fails, Prefect will retry it up to `retries` times. -if __name__ == "__main__": - always_succeeds_flow() -``` +If the flow run fails on the final retry, Prefect records the final flow run state as _failed_. -Running this flow produces the following result. +Optionally, pass an integer to `retry_delay_seconds` to specify how many seconds to wait between each retry attempt. -```bash -21:02:45.715 | INFO | prefect.engine - Created flow run 'sparkling-pony' for flow 'always-succeeds-flow' -21:02:45.816 | INFO | Flow run 'sparkling-pony' - Created task run 'always_fails_task-58ea43a6-0' for task 'always_fails_task' -21:02:45.853 | ERROR | Task run 'always_fails_task-58ea43a6-0' - Encountered exception during execution: -Traceback (most recent call last):... -ValueError: I am bad task -21:02:45.879 | ERROR | Task run 'always_fails_task-58ea43a6-0' - Finished in state Failed('Task run encountered an exception.') -21:02:46.593 | INFO | Flow run 'sparkling-pony' - Finished in state Completed() -Completed(message=None, type=COMPLETED, result='foo', flow_run_id=7240e6f5-f0a8-4e00-9440-a7b33fb51153) -``` +Check out [Transactions](/3.0rc/develop/transactions/) to make your flows even more resilient and rollback actions when desired. ## See also - Store and reuse non-sensitive bits of data, such as configuration information, by using [variables](/3.0rc/develop/variables). -- Supercharge your flow with [tasks](/3.0rc/develop/write-tasks/) to break down the workflow's complexity and make it more performant and observable. +- Make your flow more manageable, performant, and observable by breaking it into discrete units of orchestrated work with [tasks](/3.0rc/develop/write-tasks/).