Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent raise_on_failure=True behaviour for flow_run.state.result() when using run_deployment #15276

Closed
benjamincerigo opened this issue Sep 8, 2024 · 4 comments · Fixed by #15509
Labels
bug Something isn't working

Comments

@benjamincerigo
Copy link
Contributor

benjamincerigo commented Sep 8, 2024

Bug summary

As documented here raise_on_failure is supposed to raise an exception if the flow run fails. When using run_deployment this seems to have some inconsistent (and undocument) behaviour.

There seems to be a number of ways to call flow_run.state.result, see script in additional context for the ways I tried.

It seems to only work if you use await flow_run.state.result(fetch=True, raise_on_failure=True) with both await and fetch=True.

In the additional context of this issue is a script that you can use to see the different behaviors of state.result() and here are the logs of that run:
raise_on_failure_example_logs.csv

The important logs that show the other uses of flow_run.state.result don't raise the expectation in the sub flow:

Running flow as deployment: flow_with_exception/flow_with_exception
10:41:21 AM
run_flow_as_deployment-b43
prefect.task_runs
Result from `flow_run.state.result()`: None
10:41:26 AM
run_flow_as_deployment-b43
prefect.task_runs
Result `flow_run.state.result(raise_on_failure=True)`: None
10:41:26 AM
run_flow_as_deployment-b43
prefect.task_runs
Result `flow_run.state.result(fetch=True, raise_on_failure=True)`: <coroutine object sync_compatible.<locals>.coroutine_wrapper.<locals>.ctx_call at 0x7e904cf21640>
10:41:26 AM
run_flow_as_deployment-b43
prefect.task_runs
Task run failed with exception: FailedRun('Flow run encountered an exception: ValueError: This is an exception.') - Retries are exhausted
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 1258, in run_context
    yield self
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 1335, in run_task_async
    await engine.call_task_fn(txn)
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 1281, in call_task_fn
    result = await call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/projects/phiphi/phiphi/pipeline_jobs/gathers/example.py", line 59, in run_flow_as_deployment
    result = await flow_run.state.result(fetch=True, raise_on_failure=True)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 382, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 128, in _get_state_result
    raise await get_state_exception(state)
prefect.exceptions.FailedRun: Flow run encountered an exception: ValueError: This is an exception.

It seems that state.result() has very different behaviour depending on the async of sync contexted and can make it very difficult to use run_deployment in a predictable way.

Version info (prefect version output)

Version:             3.0.0
API version:         0.8.4
Python version:      3.11.8
Git commit:          c40d069d
Built:               Tue, Sep 3, 2024 11:13 AM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud
Pydantic version:    2.9.0

Additional context

Example

Run this with:
pythons example.py

Then run the parent_flow

prefect prefect deployment run parent_flow/parent_flow

import prefect


@prefect.flow(name="flow_with_none_return")
def flow_with_none_return() -> None:
    """Flow with None return."""
    logger = prefect.get_run_logger()
    logger.info("Running flow with None return.")
    return None


@prefect.flow(name="flow_with_return")
def flow_with_return() -> str:
    """Flow with return."""
    logger = prefect.get_run_logger()
    logger.info("Running flow with return.")
    return "Hello, world!"


@prefect.flow(name="flow_with_exception")
def flow_with_exception() -> None:
    """Flow with exception."""
    logger = prefect.get_run_logger()
    logger.info("Running flow with exception.")
    raise ValueError("This is an exception.")


@prefect.task
async def run_flow_as_deployment(name: str) -> None:
    """Run flow as deployment."""
    logger = prefect.get_run_logger()
    logger.info(f"Running flow as deployment: {name}")
    flow_run = await prefect.deployments.run_deployment(
        name=name,
        as_subflow=True,
    )
    result = flow_run.state.result()
    logger.info(f"Result from `flow_run.state.result()`: {result}")
    result = flow_run.state.result(raise_on_failure=True)
    logger.info(f"Result `flow_run.state.result(raise_on_failure=True)`: {result}")
    result = flow_run.state.result(fetch=True, raise_on_failure=True)
    logger.info(f"Result `flow_run.state.result(fetch=True, raise_on_failure=True)`: {result}")
    try:
        # This is a very important undocumented bit of code
        # For the raise_on_failure to work you have to do both await and fetch=True
        result = await flow_run.state.result(fetch=True, raise_on_failure=True)
        logger.info(f"Result `await flow_run.state.result(fetch=True, raise_on_failure=True)`: {result}")
    except prefect.exceptions.MissingResult:
        logger.info("""
        Successfully ran the flow, but no result was returned.
        This is expected if the flow does not have persisted results.
        Using result that was returned without fetching.
        """)


@prefect.flow(name="parent_flow")
async def parent_flow() -> None:
    """Parent flow."""
    logger = prefect.get_run_logger()
    logger.info("Running parent flow.")
    await run_flow_as_deployment("flow_with_none_return/flow_with_none_return")
    await run_flow_as_deployment("flow_with_return/flow_with_return")
    await run_flow_as_deployment("flow_with_exception/flow_with_exception")


if __name__ == "__main__":
    flow_with_none_return = flow_with_none_return.to_deployment(name="flow_with_none_return")
    flow_with_return = flow_with_return.to_deployment(name="flow_with_return")
    flow_with_exception = flow_with_exception.to_deployment(name="flow_with_exception")
    parent_flow = parent_flow.to_deployment(name="parent_flow")
    prefect.serve(flow_with_none_return, flow_with_return, flow_with_exception, parent_flow)
@benjamincerigo benjamincerigo added the bug Something isn't working label Sep 8, 2024
@desertaxle
Copy link
Member

Hey @benjamincerigo, I'm super excited to see you trying out 3.0!

You're right that this interface could be more straightforward, and I'll do my best to shed some light on it. In most cases, we hold the task or flow's return value in memory so that when you ask for that result from the state, we can return it immediately. This works when a task or flow is run in the same memory space. When you run a flow via a deployment, that flow will run in a different memory space, so when you get the result, Prefect will need to load a persisted result from a storage location. Previously, State.result() was always synchronous, but when we read from a remote storage location, we perform that read asynchronously. The fetch kwarg was added to preserve sync/async backward compatibility and is required to tell Prefect to fetch a persisted result.

If you don't want to pass fetch every time, you can set PREFECT_ASYNC_FETCH_STATE_RESULT-True in your profile or via an environment variable. We hope to improve this area via our updated sync/async (proposal in #15008).

Please let me know if you have any additional questions or suggestions for improvement in this area!

@cicdw
Copy link
Member

cicdw commented Sep 9, 2024

Another thing I'm noticing about your report is that sometimes state.result is asynchronous and returns an awaitable, and other times it runs synchronously. Another user had a similar report with state handlers, so I'll copy my response to them here:

I think I understand what's happening here; sync/async handling was updated in 3.0 for a number of reasons (largely performance) and this causes some behavior changes on internal interfaces such as state.result in the context of an asynchronous task or flow. Essentially any internal Prefect interface that is asynchronously defined will return an awaitable/coroutine when run in the context of an async task or flow. You can alter this behavior explicitly through a special _sync: bool kwarg on affected interfaces. Setting this to False will result in deterministically asynchronous execution, and setting it to True will result in deterministically synchronous execution.

So in your case, you have a number of state.result() calls within an asynchronous flow (e.g., result = flow_run.state.result()). These need to be awaited in an async context. If you would rather not await them for any reason, you can force synchronous execution through the _sync flag: result = flow_run.state.result(_sync=True).

@benjamincerigo
Copy link
Contributor Author

Thanks for the explanation.

Some clarifying questions and points:

  • it is correct that we could, and maybe should, be doing the checks on the state here even if fetch is False? Otherwise raise_on_failure only works if fetch=True (or the code works out fetch should be True) as you can see in this if.
  • If the above is not true and you can't do the state checks without fetch then it would seem that state.result should only allowed to be run as an async function/with fetch=True. Or maybe the documentation of the function should indicate there is a behaviour difference when running async or sync and fetch=True/False (ie. raise_on_failure doesn't work without a fetch (magic or not magic)) and it is a good idea to always await state.result(fetch=True)

Backwards-compatible stuff is tricky especially when combined with async sync stuff.

@desertaxle
Copy link
Member

desertaxle commented Sep 10, 2024

I think we should simplify result fetching by having explicit sync and async methods for retrieving results that consistently perform the necessary fetching. This will remove the edge case you've identified. It'll take us a little while to get there, so adding an example to this method for this edge case would be very useful. Would you be willing to submit a PR adding that example?

benjamincerigo added a commit to benjamincerigo/prefect that referenced this issue Sep 27, 2024
To give light on the complex behaviour of `state.result` when used with
`run_deployment`.

Closes issue: PrefectHQ#15276
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants