From f7d0216b456c05a01065446d17ea893207ee8fc0 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Wed, 31 Jan 2024 14:09:24 -0800 Subject: [PATCH] Webhook task (#245) support for Webhook Task --- examples/copilot/open_ai_copilot.py | 2 +- examples/wait_for_webhook.py | 76 +++++++++++++++++++ .../client/workflow/task/human_task.py | 12 +-- .../client/workflow/task/task_type.py | 1 + .../workflow/task/wait_for_webhook_task.py | 44 +++++++++++ 5 files changed, 129 insertions(+), 6 deletions(-) create mode 100644 examples/wait_for_webhook.py create mode 100644 src/conductor/client/workflow/task/wait_for_webhook_task.py diff --git a/examples/copilot/open_ai_copilot.py b/examples/copilot/open_ai_copilot.py index 61ae19d7..5411c6fd 100644 --- a/examples/copilot/open_ai_copilot.py +++ b/examples/copilot/open_ai_copilot.py @@ -77,7 +77,7 @@ def create_workflow(steps: list[str], inputs: dict[str, object]) -> dict: for step in steps: if step == 'review': - task = HumanTask(task_ref_name='review') + task = HumanTask(task_ref_name='review', display_name='review email', form_version=0, form_template='email_review') task.input_parameters.update(inputs[step]) workflow >> task else: diff --git a/examples/wait_for_webhook.py b/examples/wait_for_webhook.py new file mode 100644 index 00000000..cc2032d6 --- /dev/null +++ b/examples/wait_for_webhook.py @@ -0,0 +1,76 @@ +from conductor.client.automator.task_handler import TaskHandler +from conductor.client.configuration.configuration import Configuration +from conductor.client.http.models import StartWorkflowRequest +from conductor.client.orkes_clients import OrkesClients +from conductor.client.worker.worker_task import worker_task +from conductor.client.workflow.conductor_workflow import ConductorWorkflow +from conductor.client.workflow.task.simple_task import SimpleTask +from conductor.client.workflow.task.wait_for_webhook_task import wait_for_webhook + + +@worker_task(task_definition_name='get_user_email') +def get_user_email(userid: str) -> str: + return f'{userid}@example.com' + + +@worker_task(task_definition_name='send_email') +def send_email(email: str, subject: str, body: str): + print(f'sending email to {email} with subject {subject} and body {body}') + + +def main(): + api_config = Configuration() + + task_handler = TaskHandler( + workers=[], + configuration=api_config, + scan_for_annotated_workers=True, + ) + task_handler.start_processes() + + clients = OrkesClients(configuration=api_config) + workflow_executor = clients.get_workflow_executor() + workflow_client = clients.get_workflow_client() + + workflow = ConductorWorkflow(name='wait_for_webhook', version=1, executor=workflow_executor) + get_email = get_user_email(task_ref_name='get_user_email_ref', userid=workflow.input('userid')) + sendmail = send_email(task_ref_name='send_email_ref', email=get_email.output('result'), subject='Hello from Orkes', + body='Test Email') + + workflow >> get_email >> sendmail >> wait_for_webhook(task_ref_name="wait_ref", matches={ + "$['type']": "customer", + "$['id']": workflow.input("userid") + }) + + # webhook workflows MUST be registered before they can be used with a webhook + workflow.register(overwrite=True) + print(f'done registering workflow...') + + # create a webhook in the UI by navigating to Webhook and creating one that responds to the webhook events + # Ensure that the webhook is configured to receive events and dispatch to the workflow that is created above + # docs + # https://orkes.io/content/reference-docs/system-tasks/wait-for-webhook + + request = StartWorkflowRequest(input={'userid': 'user_a'}) + request.name = workflow.name + request.version = workflow.version + + workflow_run = workflow_client.execute_workflow(start_workflow_request=request, wait_for_seconds=60) + + # execute method will wait until the webhook task is completed, use the following cURL as sample + """ + curl --location 'http://localhost:8080/webhook/YOUR_WEBHOOK_ID' \ + --header 'a: b' \ + --header 'Content-Type: application/json' \ + --data '{ + "id": "user_a", + "type": "customer" + }' + """ + + print(f'workflow execution {workflow_run.workflow_id}') + task_handler.stop_processes() + + +if __name__ == '__main__': + main() diff --git a/src/conductor/client/workflow/task/human_task.py b/src/conductor/client/workflow/task/human_task.py index 43c0f68d..24e28044 100644 --- a/src/conductor/client/workflow/task/human_task.py +++ b/src/conductor/client/workflow/task/human_task.py @@ -37,10 +37,12 @@ def __init__(self, task_ref_name: str, task_type=TaskType.HUMAN ) self.input_parameters.update({ - 'assignmentCompletionStrategy': assignment_completion_strategy.name, - 'displayName': display_name, - 'userFormTemplate': { - 'name': form_template, - 'version': form_version + '__humanTaskDefinition': { + 'assignmentCompletionStrategy': assignment_completion_strategy.name, + 'displayName': display_name, + 'userFormTemplate': { + 'name': form_template, + 'version': form_version + } } }) diff --git a/src/conductor/client/workflow/task/task_type.py b/src/conductor/client/workflow/task/task_type.py index d10e85b6..7c68878c 100644 --- a/src/conductor/client/workflow/task/task_type.py +++ b/src/conductor/client/workflow/task/task_type.py @@ -14,6 +14,7 @@ class TaskType(str, Enum): START_WORKFLOW = 'START_WORKFLOW' EVENT = 'EVENT' WAIT = 'WAIT' + WAIT_FOR_WEBHOOK = 'WAIT_FOR_WEBHOOK' HUMAN = 'HUMAN' USER_DEFINED = 'USER_DEFINED' HTTP = 'HTTP' diff --git a/src/conductor/client/workflow/task/wait_for_webhook_task.py b/src/conductor/client/workflow/task/wait_for_webhook_task.py new file mode 100644 index 00000000..50187c25 --- /dev/null +++ b/src/conductor/client/workflow/task/wait_for_webhook_task.py @@ -0,0 +1,44 @@ +from abc import ABC + +from typing_extensions import Self + +from conductor.client.workflow.task.task import TaskInterface +from conductor.client.workflow.task.task_type import TaskType + + +class WaitForWebHookTask(TaskInterface, ABC): + + def __init__(self, task_ref_name: str, matches: dict[str, object]) -> Self: + """ + matches: dictionary of matching payload that acts as correction between the incoming webhook payload and a + running workflow task - amongst all the running workflows. + + example: + if the matches is specified as below: + + { + "$['type']": "customer_created", + "$['customer_id']": "${workflow.input.customer_id}" + } + + for an incoming webhook request with the payload like: + { + "type": "customer_created", + "customer_id": "customer_123" + } + + The system will find a matching workflow task that is in progress matching the type and customer id and complete + the task. + """ + super().__init__( + task_reference_name=task_ref_name, + task_type=TaskType.WAIT_FOR_WEBHOOK + ) + self.input_parameters['matches'] = matches + + +def wait_for_webhook(task_ref_name: str, matches: dict[str, object], task_def_name: str = None) -> TaskInterface: + task = WaitForWebHookTask(task_ref_name=task_ref_name, matches=matches) + if task_def_name is not None: + task.name = task_def_name + return task