From 87eb13c5cc756d6f7a7759ed1a426e6dc55c6ddf Mon Sep 17 00:00:00 2001 From: nate nowack Date: Thu, 25 Jul 2024 09:12:37 -0500 Subject: [PATCH] Add migration docs about final flow run state (#14714) --- docs/3.0rc/develop/write-flows.mdx | 2 +- docs/3.0rc/resources/upgrade-prefect-3.mdx | 89 ++++++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/docs/3.0rc/develop/write-flows.mdx b/docs/3.0rc/develop/write-flows.mdx index e25fd3b3ffb9..11791dd8f380 100644 --- a/docs/3.0rc/develop/write-flows.mdx +++ b/docs/3.0rc/develop/write-flows.mdx @@ -26,7 +26,7 @@ Flows have the following capabilities: - All runs of the flow have persistent [state](/3.0rc/develop/manage-states/). Transitions between states are recorded, allowing you to observe and act on flow execution. - Input arguments can be type validated as workflow [parameters](/#specify-flow-parameters). -- [Retries](/#retries) can be performed on failure. +- [Retries](#retries) can be performed on failure. - Timeouts can be enforced to prevent unintentional, long-running workflows. - Metadata about [flow runs](#flow-runs), such as run time and final state, is automatically tracked. - A flow can be [deployed](/3.0rc/deploy/infrastructure-examples/docker/), which exposes an API for interacting with it remotely. diff --git a/docs/3.0rc/resources/upgrade-prefect-3.mdx b/docs/3.0rc/resources/upgrade-prefect-3.mdx index 713c5f4bdf68..e6703604596a 100644 --- a/docs/3.0rc/resources/upgrade-prefect-3.mdx +++ b/docs/3.0rc/resources/upgrade-prefect-3.mdx @@ -245,3 +245,92 @@ If you [self-host a Prefect server instance](/3.0rc/manage/self-host), run the f ```bash prefect server database upgrade -y ``` + +## Flow run final state determination + +In Prefect 2, the final state of a flow run was influenced by the states of its task runs; if any task run failed, the flow run was marked as failed. + +In Prefect 3, the final state of a flow run is entirely determined by: + +1. The `return` value of the flow function (same as in Prefect 2): + - Literal values are considered successful. + - Any explicit `State` that is returned will be considered the final state of the flow run. If an iterable of `State` objects is returned, all must be `Completed` for the flow run to be considered `Completed`. If any are `Failed`, the flow run will be marked as `Failed`. + +2. Whether the flow function allows an exception to `raise`: + - Exceptions that are allowed to propagate will result in a `Failed` state. + - Exceptions suppressed with `raise_on_failure=False` will not affect the flow run state. + +This change means that task failures within a flow do not automatically cause the flow run to fail unless they affect the flow's return value or raise an uncaught exception. + + +When migrating from Prefect 2 to Prefect 3, be aware that flows may now complete successfully even if they contain failed tasks, unless you explicitly handle task failures. + + +To ensure your flow fails when critical tasks fail, consider these approaches: + +1. Allow exceptions to propagate by not using `raise_on_failure=False`. +2. Use `return_state=True` and explicitly check task states to conditionally `raise` the underlying exception or return a failed state. +3. Use try/except blocks to handle task failures and return appropriate states. + +### Examples + + +```python Allow Unhandled Exceptions +from prefect import flow, task + +@task +def failing_task(): + raise ValueError("Task failed") + +@flow +def my_flow(): + failing_task() # Exception propagates, causing flow failure + +try: + my_flow() +except ValueError as e: + print(f"Flow failed: {e}") # Output: Flow failed: Task failed +``` + +```python Use return_state +from prefect import flow, task +from prefect.states import Failed + +@task +def failing_task(): + raise ValueError("Task failed") + +@flow +def my_flow(): + state = failing_task(return_state=True) + if state.is_failed(): + raise ValueError(state.result()) + return "Flow completed successfully" + +try: + print(my_flow()) +except ValueError as e: + print(f"Flow failed: {e}") # Output: Flow failed: Task failed +``` + +```python Use try/except +from prefect import flow, task +from prefect.states import Failed + +@task +def failing_task(): + raise ValueError("Task failed") + +@flow +def my_flow(): + try: + failing_task() + except ValueError: + return Failed(message="Flow failed due to task failure") + return "Flow completed successfully" + +print(my_flow()) # Output: Failed(message='Flow failed due to task failure') +``` + + +Choose the strategy that best fits your specific use case and error handling requirements. \ No newline at end of file