Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prefect3.0 not running subflows in parallel #15415

Closed
captnced opened this issue Sep 18, 2024 · 9 comments · Fixed by #16095
Closed

prefect3.0 not running subflows in parallel #15415

captnced opened this issue Sep 18, 2024 · 9 comments · Fixed by #16095

Comments

@captnced
Copy link

captnced commented Sep 18, 2024

hello community !
I'm using prefect==3.0.2 and there seem to be an issue again with running subflows concurrently

this code triggers 5 subflows that are being run sequentially:

@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=3)
)
async def subflow_1():
    print("Subflow 1 started!")
    await asyncio.sleep(1)


@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=3)
)
async def main_flow():
    for _ in range(5):
        await subflow_1()

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

even when using workpool, workers and ad-hoc deployment, I still cannot have my subflows running in parallel:

@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=3),
    log_prints=True,
)
async def master(names: list[str]):
    _r = map(main,names)
    r = await asyncio.gather(*_r)   
    print('master > ',r)
    return r

@task(
    log_prints=True
)
async def doit(name):
    print('task <',name)
    await asyncio.sleep(5)
    return f'hello {name} !!'


@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=3),
    log_prints=True,
)
async def main(name):
    print('main <',name)
    _r = doit.submit(name)
    res = _r.result()
    print('main >',res)
    return res


if __name__ == "__main__":

    '''
        CREATE WORKPOOL + WORKERS + DEPLOY
    '''
    # prefect work-pool create --overwrite --type process wp1 --set-as-default
    # prefect worker start --pool wp1 --limit 5 --type process ( << run several of these)
    # prefect deploy /work/path/test_1.py:master --pool wp1 --work-queue default --concurrency-limit 10 --name dep1
    async def run_deployment(dep_id,names):
        async with get_client() as client:            
            flow_run = await client.create_flow_run_from_deployment(
                deployment_id=dep_id,
                parameters=dict(names=names),
                job_variables={'names':names}
            )
            print('flow run',flow_run)

    dp_id='c8cb8a2c-0427-4474-a9fc-583bcc78bf98'
    asyncio.run(run_deployment(dp_id,['charlie', 'bob', 'alice']))

am I doing anything wrong ?

Originally posted by @captnced in #5853 (comment)

@captnced
Copy link
Author

captnced commented Sep 18, 2024

update : if I make use of a wrapper task to call the subflow then the subflows are being run in parallel.

import asyncio
from prefect import deploy, flow, task
from datetime import datetime, timedelta, timezone
from prefect.client.schemas.objects import FlowRun
from prefect.client.orchestration import get_client
from prefect.states import Scheduled
from prefect.task_runners import ThreadPoolTaskRunner


@task(
    log_prints=True,
)
async def doit_wrapper_task(name):
    return await main(name) # TASK > SUB TASK

@task(
    log_prints=True
)
async def doit(name):
    print('task <',name)
    await asyncio.sleep(5)
    return f'hello {name} !!'


@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=20),
    log_prints=True,
)
async def main(name):
    print('main <',name)
    _r = doit.submit(name)
    res = _r.result()
    print('main >',res)
    return res



@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=20),
    log_prints=True,
)
async def master(names: list[str]):
    _r = [doit_wrapper_task.submit(name) for name in names]
    r = [ __r.result() for __r in _r]

    print('master > ',r)
    return r

@desertaxle desertaxle moved this to Backlog in OSS Backlog Sep 18, 2024
@desertaxle
Copy link
Member

Thanks for the issue @captnced!

It looks like you need an asyncio.gather call in your first example:

import asyncio

from prefect import flow


@flow
async def subflow_1():
    print("Subflow 1 started!")
    await asyncio.sleep(1)


@flow
async def main_flow():
    await asyncio.gather(*[subflow_1() for _ in range(5)])


if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

However, even with asyncio.gather, the starting of those flow runs appears staggered, suggesting that something is blocking the event loop in the flow run engine. I suspect the example in your comment works because each child flow run happens in a separate thread, which works around this issue.

We can investigate what's blocking the event loop in the flow run engine and post updates here. If you discover anything new, please leave a comment!

@tyong920
Copy link

@desertaxle By the way, may I ask if Prefect, especially version 3.0, supports parallel execution of subflows? I haven’t found any documentation or examples mentioning this feature. If you could provide any clarification, I would greatly appreciate it.

@desertaxle
Copy link
Member

@tyong920 you should be able to run child flows concurrently/in parallel as you would any other Python function. This issue shows some snags when using asyncio for concurrency due to some blocking behavior in the engine. Still, you should be able to use threads or processes to achieve concurrency or parallelism for your child flows.

@tyong920
Copy link

@tyong920 you should be able to run child flows concurrently/in parallel as you would any other Python function. This issue shows some snags when using asyncio for concurrency due to some blocking behavior in the engine. Still, you should be able to use threads or processes to achieve concurrency or parallelism for your child flows.

Thank you very much for your response.

I’d like to clarify my understanding. As you mentioned, if I treat a subflow like a regular Python function—for example, using concurrent.futures.ThreadPoolExecutor() as follows:
with concurrent.futures.ThreadPoolExecutor() as executor: executor.submit(sub_flow, ...)

I believe this should work. However, is this the recommended approach for running subflows in parallel in Prefect v3? It seems that in this case, the task_runner=ThreadPoolTaskRunner() in the main flow isn’t being fully utilized.

@desertaxle
Copy link
Member

@bwooldridge-alphalayer do you have a code example that you could share where you see child flows sharing the same thread when used with the ThreadPoolTaskRunner? I'm very surprised by that and would like to find out why that's happening.

We'll tackle the root cause of this issue when working on #15008. We plan to do so after we complete our current settings milestone.

@bwooldridge-alphalayer
Copy link

@bwooldridge-alphalayer do you have a code example that you could share where you see child flows sharing the same thread when used with the ThreadPoolTaskRunner? I'm very surprised by that and would like to find out why that's happening.

We'll tackle the root cause of this issue when working on #15008. We plan to do so after we complete our current settings milestone.

Hi @desertaxle, thanks for the quick reply. I was able to get my subflows working in parallel using the task wrapper approach. I'll delete my previous message to avoid future confusion.

@brett-patterson-ent
Copy link

@desertaxle We're seeing the same issue where the flow setup itself is synchronous even when running an async flow. In Prefect 2, I believe the analogous flow setup used an async Prefect client so all network calls to the Prefect server could yield to other tasks on the event loop. For flows that launch many subflows concurrently, this ends up being a significant performance penalty.

While there are workarounds like executing subflows on their own threads, there are drawbacks to these workarounds that make them unappealing for our use cases. Will #15008 address making the flow engine itself asynchronous when running async flows? Thanks!

@zzstoatzz
Copy link
Collaborator

hi @brett-patterson-ent - that issue you linked is more about public interfaces being sync_compatible and fixing this than it is about as AsyncFlowEngine - if you look at task_engine.py that should give a sense of what we would need to do to flow_engine.py, which is likely something we will do (but im not sure an issue exists for that yet).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

6 participants