Streaming workflow with nested workflow(s) #15838
-
Hi, I am trying to stream events from a workflow, as documented in https://docs.llamaindex.ai/en/stable/understanding/workflows/stream/. And, it works! However, when I have one or more nested workflow(s) invoked from one of the steps in the main workflow (as shown in https://docs.llamaindex.ai/en/stable/understanding/workflows/nested/ but using llama_index.core.workflow.errors.WorkflowRuntimeError: This workflow has multiple concurrent runs in progress and cannot stream events. To be able to stream events, make sure you call `run()` on this workflow only once. I am trying to stream events from the nested workflow upwards through the main workflow as follows, before calling async for nested_ev in react_workflow.stream_events():
ctx.write_event_to_stream(Event(msg=nested_ev.msg)) I think I know what is going wrong because the step in the main workflow that calls the nested workflow as an async task can be triggered multiple times with different data while the nested workflow tasks have not finished. That is why the error is about calling How can I pass different data to different calls to the nested workflow if I can call Or, is there a far better way of doing this? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 8 replies
-
To handle different data for multiple calls to the nested workflow while ensuring that each invocation of the nested workflow is awaited before starting a new one, you can use the Here is an example of how you can achieve this: from llama_index.core.workflow.workflow import Workflow, StartEvent, StopEvent, step, Context, Event
# Define your main workflow
class MainWorkflow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent, reflection_workflow: Workflow) -> Event:
print("Need to run reflection")
res = await reflection_workflow.run(query=ev.query)
return Event(query=res)
# Define your nested workflow
class NestedWorkflow(Workflow):
@step
async def sub_start(self, ctx: Context, ev: StartEvent) -> StopEvent:
print("Doing custom reflection")
return StopEvent(result="Improved query")
# Create instances of the workflows
main_workflow = MainWorkflow()
nested_workflow1 = NestedWorkflow()
nested_workflow2 = NestedWorkflow()
# Add the nested workflows to the main workflow
main_workflow.add_workflows(reflection_workflow=nested_workflow1)
# Run the nested workflows with different data
await nested_workflow1.run(query="data_for_first_call")
await nested_workflow2.run(query="data_for_second_call")
# Stream events from the main workflow
async for event in main_workflow.stream_events():
print(event) In this example, Additionally, you can refer to the |
Beta Was this translation helpful? Give feedback.
To handle multiple parallel triggers in the main workflow that call the nested workflow without causing the
WorkflowRuntimeError
due to concurrent runs, you can dynamically create and manage multiple instances of the nested workflow. This approach ensures that each nested workflow instance is only run once, thus avoiding theWorkflowRuntimeError
.Here is an example of how you can achieve this:
Main Workflow