Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

action_on_event_received only process events sent by current workflow execution #358

Open
jiangxin369 opened this issue Jul 22, 2022 · 0 comments
Assignees

Comments

@jiangxin369
Copy link
Contributor

jiangxin369 commented Jul 22, 2022

Describe the feature

Currently, the event is broadcasting, once the wanted event is sent, all workflow executions of this workflow would be triggered.

Describe the solution you'd like

As the scheduler dispatcher would inspect the context of each event to figure out if it contains the workflow execution id, we can inject the runtime context of each task execution to the user-defined event to make sure the event will only effect on the current workflow execution.

We need the following changes:

  • Add a global variable _CURRENT_TASK_CONTEXT in context.py, it is used to store the runtime context of each task execution.
  • To read and write the _CURRENT_TASK_CONTEXT, add get_runtime_task_context and set_runtime_task_context functions.
  • Add a public API called wrap_execution_context to inject the runtime info to the context of the event before sending it.
_CURRENT_TASK_CONTEXT: TaskExecutionContext = None


def set_runtime_task_context(context: TaskExecutionContext):
    global _CURRENT_TASK_CONTEXT
    _CURRENT_TASK_CONTEXT = context


def get_runtime_task_context():
    return _CURRENT_TASK_CONTEXT


def wrap_execution_info_to_context(event: Event):
    """
  The event whose context is wrapped with workflow execution info would only be processed by specific workflow execution.
  """
    pass

How to use it?

def func():
    notification_client = get_notification_client()
    event = Event(event_key=EVENT_KEY, message='This is a custom message.')
    
    // wrap event with context
    wrap_execution_info_to_context(event)
    // send event
    notification_client.send_event(event)

with Workflow(name='workflow') as w1:
    task = PythonOperator(name='task', python_callable=func)

Describe alternatives you've considered

Additional context

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant