Skip to content

Commit

Permalink
Webhook task (#245)
Browse files Browse the repository at this point in the history
support for Webhook Task
  • Loading branch information
v1r3n authored Jan 31, 2024
1 parent 61c91c5 commit f7d0216
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 6 deletions.
2 changes: 1 addition & 1 deletion examples/copilot/open_ai_copilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def create_workflow(steps: list[str], inputs: dict[str, object]) -> dict:

for step in steps:
if step == 'review':
task = HumanTask(task_ref_name='review')
task = HumanTask(task_ref_name='review', display_name='review email', form_version=0, form_template='email_review')
task.input_parameters.update(inputs[step])
workflow >> task
else:
Expand Down
76 changes: 76 additions & 0 deletions examples/wait_for_webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.http.models import StartWorkflowRequest
from conductor.client.orkes_clients import OrkesClients
from conductor.client.worker.worker_task import worker_task
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.simple_task import SimpleTask
from conductor.client.workflow.task.wait_for_webhook_task import wait_for_webhook


@worker_task(task_definition_name='get_user_email')
def get_user_email(userid: str) -> str:
return f'{userid}@example.com'


@worker_task(task_definition_name='send_email')
def send_email(email: str, subject: str, body: str):
print(f'sending email to {email} with subject {subject} and body {body}')


def main():
api_config = Configuration()

task_handler = TaskHandler(
workers=[],
configuration=api_config,
scan_for_annotated_workers=True,
)
task_handler.start_processes()

clients = OrkesClients(configuration=api_config)
workflow_executor = clients.get_workflow_executor()
workflow_client = clients.get_workflow_client()

workflow = ConductorWorkflow(name='wait_for_webhook', version=1, executor=workflow_executor)
get_email = get_user_email(task_ref_name='get_user_email_ref', userid=workflow.input('userid'))
sendmail = send_email(task_ref_name='send_email_ref', email=get_email.output('result'), subject='Hello from Orkes',
body='Test Email')

workflow >> get_email >> sendmail >> wait_for_webhook(task_ref_name="wait_ref", matches={
"$['type']": "customer",
"$['id']": workflow.input("userid")
})

# webhook workflows MUST be registered before they can be used with a webhook
workflow.register(overwrite=True)
print(f'done registering workflow...')

# create a webhook in the UI by navigating to Webhook and creating one that responds to the webhook events
# Ensure that the webhook is configured to receive events and dispatch to the workflow that is created above
# docs
# https://orkes.io/content/reference-docs/system-tasks/wait-for-webhook

request = StartWorkflowRequest(input={'userid': 'user_a'})
request.name = workflow.name
request.version = workflow.version

workflow_run = workflow_client.execute_workflow(start_workflow_request=request, wait_for_seconds=60)

# execute method will wait until the webhook task is completed, use the following cURL as sample
"""
curl --location 'http://localhost:8080/webhook/YOUR_WEBHOOK_ID' \
--header 'a: b' \
--header 'Content-Type: application/json' \
--data '{
"id": "user_a",
"type": "customer"
}'
"""

print(f'workflow execution {workflow_run.workflow_id}')
task_handler.stop_processes()


if __name__ == '__main__':
main()
12 changes: 7 additions & 5 deletions src/conductor/client/workflow/task/human_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ def __init__(self, task_ref_name: str,
task_type=TaskType.HUMAN
)
self.input_parameters.update({
'assignmentCompletionStrategy': assignment_completion_strategy.name,
'displayName': display_name,
'userFormTemplate': {
'name': form_template,
'version': form_version
'__humanTaskDefinition': {
'assignmentCompletionStrategy': assignment_completion_strategy.name,
'displayName': display_name,
'userFormTemplate': {
'name': form_template,
'version': form_version
}
}
})
1 change: 1 addition & 0 deletions src/conductor/client/workflow/task/task_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class TaskType(str, Enum):
START_WORKFLOW = 'START_WORKFLOW'
EVENT = 'EVENT'
WAIT = 'WAIT'
WAIT_FOR_WEBHOOK = 'WAIT_FOR_WEBHOOK'
HUMAN = 'HUMAN'
USER_DEFINED = 'USER_DEFINED'
HTTP = 'HTTP'
Expand Down
44 changes: 44 additions & 0 deletions src/conductor/client/workflow/task/wait_for_webhook_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from abc import ABC

from typing_extensions import Self

from conductor.client.workflow.task.task import TaskInterface
from conductor.client.workflow.task.task_type import TaskType


class WaitForWebHookTask(TaskInterface, ABC):

def __init__(self, task_ref_name: str, matches: dict[str, object]) -> Self:
"""
matches: dictionary of matching payload that acts as correction between the incoming webhook payload and a
running workflow task - amongst all the running workflows.
example:
if the matches is specified as below:
{
"$['type']": "customer_created",
"$['customer_id']": "${workflow.input.customer_id}"
}
for an incoming webhook request with the payload like:
{
"type": "customer_created",
"customer_id": "customer_123"
}
The system will find a matching workflow task that is in progress matching the type and customer id and complete
the task.
"""
super().__init__(
task_reference_name=task_ref_name,
task_type=TaskType.WAIT_FOR_WEBHOOK
)
self.input_parameters['matches'] = matches


def wait_for_webhook(task_ref_name: str, matches: dict[str, object], task_def_name: str = None) -> TaskInterface:
task = WaitForWebHookTask(task_ref_name=task_ref_name, matches=matches)
if task_def_name is not None:
task.name = task_def_name
return task

0 comments on commit f7d0216

Please sign in to comment.