From 61c91c57d5dd8d9c155e08137a7ed605663ffd67 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Wed, 24 Jan 2024 10:00:45 -0800 Subject: [PATCH] Update workflow state api (#244) api for updating workflow state api --- examples/copilot/__init__.py | 0 examples/copilot/customer.py | 9 + examples/copilot/open_ai_copilot.py | 236 ++++++++++++++++++ examples/sync_updates.py | 54 ++++ examples/workflow_rerun.py | 56 +++++ .../client/http/api/workflow_resource_api.py | 127 ++++++++++ .../http/models/workflow_state_update.py | 153 ++++++++++++ .../client/orkes/orkes_workflow_client.py | 13 +- .../client/workflow/task/human_task.py | 36 ++- src/conductor/client/workflow_client.py | 8 +- 10 files changed, 689 insertions(+), 3 deletions(-) create mode 100644 examples/copilot/__init__.py create mode 100644 examples/copilot/customer.py create mode 100644 examples/copilot/open_ai_copilot.py create mode 100644 examples/sync_updates.py create mode 100644 examples/workflow_rerun.py create mode 100644 src/conductor/client/http/models/workflow_state_update.py diff --git a/examples/copilot/__init__.py b/examples/copilot/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/copilot/customer.py b/examples/copilot/customer.py new file mode 100644 index 00000000..1e1837d8 --- /dev/null +++ b/examples/copilot/customer.py @@ -0,0 +1,9 @@ +from dataclasses import dataclass + + +@dataclass +class Customer: + id: int + name: str + annual_spend: float + country: str \ No newline at end of file diff --git a/examples/copilot/open_ai_copilot.py b/examples/copilot/open_ai_copilot.py new file mode 100644 index 00000000..61ae19d7 --- /dev/null +++ b/examples/copilot/open_ai_copilot.py @@ -0,0 +1,236 @@ +import json +import os +import random +import string +from typing import List + +from conductor.client.ai.configuration import LLMProvider +from conductor.client.ai.integrations import OpenAIConfig +from conductor.client.ai.orchestrator import AIOrchestrator +from conductor.client.automator.task_handler import TaskHandler +from conductor.client.configuration.configuration import Configuration +from conductor.client.http.models import TaskDef, TaskResult +from conductor.client.http.models.task_result_status import TaskResultStatus +from conductor.client.http.models.workflow_state_update import WorkflowStateUpdate +from conductor.client.orkes_clients import OrkesClients +from conductor.client.worker.worker_task import worker_task +from conductor.client.workflow.conductor_workflow import ConductorWorkflow +from conductor.client.workflow.task.dynamic_task import DynamicTask +from conductor.client.workflow.task.human_task import HumanTask +from conductor.client.workflow.task.llm_tasks.llm_chat_complete import LlmChatComplete, ChatMessage +from conductor.client.workflow.task.simple_task import SimpleTask +from conductor.client.workflow.task.sub_workflow_task import SubWorkflowTask +from conductor.client.workflow.task.switch_task import SwitchTask +from conductor.client.workflow.task.timeout_policy import TimeoutPolicy +from conductor.client.workflow.task.wait_task import WaitTask +from customer import Customer + + +def start_workers(api_config): + task_handler = TaskHandler( + workers=[], + configuration=api_config, + scan_for_annotated_workers=True, + ) + task_handler.start_processes() + return task_handler + + +@worker_task(task_definition_name='get_customer_list') +def get_customer_list() -> List[Customer]: + customers = [] + for i in range(100): + customer_name = ''.join(random.choices(string.ascii_uppercase + + string.digits, k=5)) + spend = random.randint(a=100000, b=9000000) + customers.append( + Customer(id=i, name='Customer ' + customer_name, + annual_spend=spend, + country='US') + ) + return customers + + +@worker_task(task_definition_name='get_top_n') +def get_top_n_customers(n: int, customers: List[Customer]) -> List[Customer]: + customers.sort(key=lambda x: x.annual_spend, reverse=True) + end = min(n + 1, len(customers)) + return customers[1: end] + + +@worker_task(task_definition_name='generate_promo_code') +def get_top_n_customers() -> str: + res = ''.join(random.choices(string.ascii_uppercase + + string.digits, k=5)) + return res + + +@worker_task(task_definition_name='send_email') +def send_email(customer: list[Customer], promo_code: str) -> str: + return f'Sent {promo_code} to {len(customer)} customers' + + +@worker_task(task_definition_name='create_workflow') +def create_workflow(steps: list[str], inputs: dict[str, object]) -> dict: + executor = OrkesClients().get_workflow_executor() + workflow = ConductorWorkflow(executor=executor, name='copilot_execution', version=1) + + for step in steps: + if step == 'review': + task = HumanTask(task_ref_name='review') + task.input_parameters.update(inputs[step]) + workflow >> task + else: + task = SimpleTask(task_reference_name=step, task_def_name=step) + task.input_parameters.update(inputs[step]) + workflow >> task + + workflow.register(overwrite=True) + print(f'\n\n\nRegistered workflow by name {workflow.name}\n') + return workflow.to_workflow_def().toJSON() + + +def main(): + llm_provider = 'open_ai_' + os.getlogin() + chat_complete_model = 'gpt-4' + api_config = Configuration() + clients = OrkesClients(configuration=api_config) + workflow_executor = clients.get_workflow_executor() + metadata_client = clients.get_metadata_client() + workflow_client = clients.get_workflow_client() + task_handler = start_workers(api_config=api_config) + + # register our two tasks + metadata_client.register_task_def(task_def=TaskDef(name='get_weather')) + metadata_client.register_task_def(task_def=TaskDef(name='get_price_from_amazon')) + + # Define and associate prompt with the AI integration + prompt_name = 'chat_function_instructions' + prompt_text = """ + You are a helpful assistant that can answer questions using tools provided. + You have the following tools specified as functions in python: + 1. get_customer_list() -> Customer (useful to get the list of customers / all the customers / customers) + 2. generate_promo_code() -> str (useful to generate a promocode for the customer) + 3. send_email(customer: Customer, promo_code: str) (useful when sending an email to a customer, promo code is the output of the generate_promo_code function) + 4. get_top_n(n: int, customers: List[Customer]) -> List[Customer] + ( + useful to get the top N customers based on their spend. + customers as input can come from the output of get_customer_list function using ${get_customer_list.output.result} + reference. + This function needs a list of customers as input to get the top N. + ). + 5. create_workflow(steps: List[str], inputs: dict[str, dict]) -> dict + (Useful to chain the function calls. + inputs are: + steps: which is the list of python functions to be executed + inputs: a dictionary with key as the function name and value as the dictionary object that is given as the input + to the function when calling + ). + 6. review(input: str) (useful when you wan a human to review something) + note, if you have to execute multiple steps, then you MUST use create_workflow function. + Do not call a function from another function to chain them. + + When asked a question, you can use one of these functions to answer the question if required. + + If you have to call these functions, respond with a python code that will call this function. + Make sure, when you have to call a function return in the following valid JSON format that can be parsed directly as a json object: + { + "type": "function", + "function": "ACTUAL_PYTHON_FUNCTION_NAME_TO_CALL_WITHOUT_PARAMETERS" + "function_parameters": "PARAMETERS FOR THE FUNCTION as a JSON map with key as parameter name and value as parameter value" + } + + Rule: Think about the steps to do this, but your output MUST be the above JSON formatted response. + ONLY send the JSON response - nothing else! + + """ + open_ai_config = OpenAIConfig() + + orchestrator = AIOrchestrator(api_configuration=api_config) + orchestrator.add_ai_integration(ai_integration_name=llm_provider, provider=LLMProvider.OPEN_AI, + models=[chat_complete_model], + description='openai config', + config=open_ai_config) + + orchestrator.add_prompt_template(prompt_name, prompt_text, 'chat instructions') + + # associate the prompts + orchestrator.associate_prompt_template(prompt_name, llm_provider, [chat_complete_model]) + + wf = ConductorWorkflow(name='my_function_chatbot', version=1, executor=workflow_executor) + + user_input = WaitTask(task_ref_name='get_user_input') + + chat_complete = LlmChatComplete(task_ref_name='chat_complete_ref', + llm_provider=llm_provider, model=chat_complete_model, + instructions_template=prompt_name, + messages=[ + ChatMessage(role='user', + message=user_input.output('query')) + ], + max_tokens=2048) + + function_call = DynamicTask(task_reference_name='fn_call_ref', dynamic_task='SUB_WORKFLOW') + function_call.input_parameters['steps'] = chat_complete.output('function_parameters.steps') + function_call.input_parameters['inputs'] = chat_complete.output('function_parameters.inputs') + function_call.input_parameters['subWorkflowName'] = 'copilot_execution' + function_call.input_parameters['subWorkflowVersion'] = 1 + + sub_workflow = SubWorkflowTask(task_ref_name='execute_workflow', workflow_name='copilot_execution', version=1) + + create = create_workflow(task_ref_name='create_workflow', steps=chat_complete.output('function_parameters.steps'), + inputs=chat_complete.output('function_parameters.inputs')) + call_function = SwitchTask(task_ref_name='to_call_or_not', case_expression=chat_complete.output('function')) + call_function.switch_case('create_workflow', [create, sub_workflow]) + + call_one_fun = DynamicTask(task_reference_name='call_one_fun_ref', dynamic_task=chat_complete.output('function')) + call_one_fun.input_parameters['inputs'] = chat_complete.output('function_parameters') + call_one_fun.input_parameters['dynamicTaskInputParam'] = 'inputs' + + call_function.default_case([call_one_fun]) + + wf >> user_input >> chat_complete >> call_function + + # let's make sure we don't run it for more than 2 minutes -- avoid runaway loops + wf.timeout_seconds(120).timeout_policy(timeout_policy=TimeoutPolicy.TIME_OUT_WORKFLOW) + message = """ + I am a helpful bot that can help with your customer management. + + Here are some examples: + + 1. Get me the list of top N customers + 2. Get the list of all the customers + 3. Get the list of top N customers and send them a promo code + """ + print(message) + workflow_run = wf.execute(wait_until_task_ref=user_input.task_reference_name, wait_for_seconds=120) + workflow_id = workflow_run.workflow_id + query = input('>> ') + input_task = workflow_run.get_task(task_reference_name=user_input.task_reference_name) + workflow_run = workflow_client.update_state(workflow_id=workflow_id, + update_requesst=WorkflowStateUpdate( + task_reference_name=user_input.task_reference_name, + task_result=TaskResult(task_id=input_task.task_id, output_data={ + 'query': query + }, status=TaskResultStatus.COMPLETED) + ), + wait_for_seconds=30) + + task_handler.stop_processes() + output = json.dumps(workflow_run.output['result'], indent=3) + print(f""" + + {output} + + """) + + print(f""" + See the complete execution graph here: + + {api_config.host.replace("/api", "")}/execution/{workflow_id} + + """) + + +if __name__ == '__main__': + main() diff --git a/examples/sync_updates.py b/examples/sync_updates.py new file mode 100644 index 00000000..bffc6d70 --- /dev/null +++ b/examples/sync_updates.py @@ -0,0 +1,54 @@ +from conductor.client.automator.task_handler import TaskHandler +from conductor.client.configuration.configuration import Configuration +from conductor.client.http.models import StartWorkflowRequest, TaskResult +from conductor.client.http.models.task_result_status import TaskResultStatus +from conductor.client.http.models.workflow_state_update import WorkflowStateUpdate +from conductor.client.orkes_clients import OrkesClients +from conductor.client.worker.worker_task import worker_task +from conductor.client.workflow.conductor_workflow import ConductorWorkflow +from conductor.client.workflow.task.http_task import HttpTask +from conductor.client.workflow.task.javascript_task import JavascriptTask +from conductor.client.workflow.task.json_jq_task import JsonJQTask +from conductor.client.workflow.task.set_variable_task import SetVariableTask +from conductor.client.workflow.task.switch_task import SwitchTask +from conductor.client.workflow.task.terminate_task import TerminateTask, WorkflowStatus +from conductor.client.workflow.task.wait_task import WaitTask + + +def main(): + api_config = Configuration() + + clients = OrkesClients(configuration=api_config) + workflow_client = clients.get_workflow_client() + + request = StartWorkflowRequest() + request.name = 'sync_task_variable_updates' + request.version = 1 + workflow_run = workflow_client.execute_workflow(start_workflow_request=request, wait_for_seconds=10, + wait_until_task_ref='wait_task_ref') + print(f'started {workflow_run.workflow_id}') + + task_result = TaskResult() + task_result.status = TaskResultStatus.COMPLETED + + state_update = WorkflowStateUpdate() + state_update.task_reference_name = 'wait_task_ref' + state_update.task_result = task_result + state_update.variables = { + 'case': 'case1' + } + + workflow_run = workflow_client.update_state(workflow_id=workflow_run.workflow_id, update_requesst=state_update, + wait_for_seconds=1, + wait_until_task_ref_names=['wait_task_ref_1', 'wait_task_ref_2']) + last_task_ref = workflow_run.tasks[len(workflow_run.tasks) - 1].reference_task_name + print(f'workflow: {workflow_run.status}, last task = {last_task_ref}') + + state_update.task_reference_name = last_task_ref + workflow_run = workflow_client.update_state(workflow_id=workflow_run.workflow_id, update_requesst=state_update, + wait_for_seconds=1) + print(f'workflow: {workflow_run.status}, last task = {last_task_ref}') + + +if __name__ == '__main__': + main() diff --git a/examples/workflow_rerun.py b/examples/workflow_rerun.py new file mode 100644 index 00000000..91ebb43e --- /dev/null +++ b/examples/workflow_rerun.py @@ -0,0 +1,56 @@ +import time +import uuid + +from conductor.client.configuration.configuration import Configuration +from conductor.client.http.models import StartWorkflowRequest, RerunWorkflowRequest, TaskResult, WorkflowRun +from conductor.client.http.models.task_result_status import TaskResultStatus +from conductor.client.http.models.workflow_state_update import WorkflowStateUpdate +from conductor.client.orkes_clients import OrkesClients +from conductor.client.workflow.conductor_workflow import ConductorWorkflow +from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor +from conductor.client.workflow.task.http_task import HttpTask +from conductor.client.workflow.task.wait_task import WaitTask +from conductor.client.workflow_client import WorkflowClient + + +def start_workflow(workflow_client: WorkflowClient) -> WorkflowRun: + request = StartWorkflowRequest() + request.name = 'rerun_test' + request.version = 1 + request.input = { + 'case': 'case1' + } + return workflow_client.execute_workflow(start_workflow_request=request, wait_until_task_ref='simple_task_ref1_case1_1') + + +def main(): + api_config = Configuration() + clients = OrkesClients(configuration=api_config) + workflow_client = clients.get_workflow_client() + task_client = clients.get_task_client() + + workflow_run = start_workflow(workflow_client) + workflow_id = workflow_run.workflow_id + print(f'started workflow with id {workflow_id}') + print(f'You can monitor the workflow in the UI here: http://localhost:5001/execution/{workflow_id}') + + + + update_request = WorkflowStateUpdate() + update_request.task_reference_name = 'simple_task_ref1_case1_1' + update_request.task_result = TaskResult() + update_request.task_result.status = TaskResultStatus.COMPLETED + workflow_client.update_state(workflow_id=workflow_id, update_requesst=update_request, wait_until_task_ref_names='simple_task_ref1_case1_2', wait_for_seconds=0) + + update_request.task_reference_name = 'simple_task_ref1_case1_2' + workflow_run = workflow_client.update_state(workflow_id=workflow_id, update_requesst=update_request, + wait_until_task_ref_names='simple_task_ref2_case1_1', wait_for_seconds=0) + + task = workflow_run.get_task(task_reference_name='simple_task_ref1_case1_2') + rerun_request = RerunWorkflowRequest() + rerun_request.re_run_from_task_id = task.task_id + workflow_client.rerun_workflow(workflow_id=workflow_id, rerun_workflow_request=rerun_request) + + +if __name__ == '__main__': + main() diff --git a/src/conductor/client/http/api/workflow_resource_api.py b/src/conductor/client/http/api/workflow_resource_api.py index e11764b2..ed30f888 100644 --- a/src/conductor/client/http/api/workflow_resource_api.py +++ b/src/conductor/client/http/api/workflow_resource_api.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import re # noqa: F401 +import uuid # python 2 and python 3 compatibility library import six @@ -197,6 +198,8 @@ def delete1_with_http_info(self, workflow_id, **kwargs): # noqa: E501 collection_formats=collection_formats) def execute_workflow(self, body, request_id, name, version, **kwargs): # noqa: E501 + if request_id is None: + request_id = str(uuid.uuid4()) """Execute a workflow synchronously # noqa: E501 This method makes a synchronous HTTP request by default. To make an @@ -2925,3 +2928,127 @@ def upgrade_running_workflow_to_version_with_http_info(self, body, workflow_id, _preload_content=params.get('_preload_content', True), _request_timeout=params.get('_request_timeout'), collection_formats=collection_formats) + + def update_workflow_and_task_state(self, update_requesst, workflow_id, **kwargs): # noqa: E501 + request_id = str(uuid.uuid4()) + """Update a workflow state by updating variables or in progress task # noqa: E501 + + Updates the workflow variables, tasks and triggers evaluation. # noqa: E501 + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.update_workflow_and_task_state(update_requesst, request_id, workflow_id, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param WorkflowStateUpdate body: (required) + :param str request_id: (required) + :param str workflow_id: (required) + :param str wait_until_task_ref: + :param int wait_for_seconds: + :return: WorkflowRun + If the method is called asynchronously, + returns the request thread. + """ + kwargs['_return_http_data_only'] = True + if kwargs.get('async_req'): + return self.update_workflow_and_task_state_with_http_info(update_requesst, request_id, workflow_id, **kwargs) # noqa: E501 + else: + (data) = self.update_workflow_and_task_state_with_http_info(update_requesst, request_id, workflow_id, **kwargs) # noqa: E501 + return data + + def update_workflow_and_task_state_with_http_info(self, body, request_id, workflow_id, **kwargs): # noqa: E501 + """Update a workflow state by updating variables or in progress task # noqa: E501 + + Updates the workflow variables, tasks and triggers evaluation. # noqa: E501 + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.update_workflow_and_task_state_with_http_info(body, request_id, workflow_id, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param WorkflowStateUpdate body: (required) + :param str request_id: (required) + :param str workflow_id: (required) + :param str wait_until_task_ref: + :param int wait_for_seconds: + :return: WorkflowRun + If the method is called asynchronously, + returns the request thread. + """ + + all_params = ['body', 'request_id', 'workflow_id', 'wait_until_task_ref', 'wait_for_seconds'] # noqa: E501 + all_params.append('async_req') + all_params.append('_return_http_data_only') + all_params.append('_preload_content') + all_params.append('_request_timeout') + + params = locals() + for key, val in six.iteritems(params['kwargs']): + if key not in all_params: + raise TypeError( + "Got an unexpected keyword argument '%s'" + " to method update_workflow_and_task_state" % key + ) + params[key] = val + del params['kwargs'] + # verify the required parameter 'body' is set + if ('body' not in params or + params['body'] is None): + raise ValueError("Missing the required parameter `body` when calling `update_workflow_and_task_state`") # noqa: E501 + # verify the required parameter 'request_id' is set + if ('request_id' not in params or + params['request_id'] is None): + raise ValueError("Missing the required parameter `request_id` when calling `update_workflow_and_task_state`") # noqa: E501 + # verify the required parameter 'workflow_id' is set + if ('workflow_id' not in params or + params['workflow_id'] is None): + raise ValueError("Missing the required parameter `workflow_id` when calling `update_workflow_and_task_state`") # noqa: E501 + + collection_formats = {} + + path_params = {} + if 'workflow_id' in params: + path_params['workflowId'] = params['workflow_id'] # noqa: E501 + + query_params = [] + if 'request_id' in params: + query_params.append(('requestId', params['request_id'])) # noqa: E501 + if 'wait_until_task_ref' in params: + query_params.append(('waitUntilTaskRef', params['wait_until_task_ref'])) # noqa: E501 + if 'wait_for_seconds' in params: + query_params.append(('waitForSeconds', params['wait_for_seconds'])) # noqa: E501 + + header_params = {} + + form_params = [] + local_var_files = {} + + body_params = None + if 'body' in params: + body_params = params['body'] + # HTTP header `Accept` + header_params['Accept'] = self.api_client.select_header_accept( + ['*/*']) # noqa: E501 + + # HTTP header `Content-Type` + header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501 + ['application/json']) # noqa: E501 + + # Authentication setting + auth_settings = ['api_key'] # noqa: E501 + + return self.api_client.call_api( + '/workflow/{workflowId}/state', 'POST', + path_params, + query_params, + header_params, + body=body_params, + post_params=form_params, + files=local_var_files, + response_type='WorkflowRun', # noqa: E501 + auth_settings=auth_settings, + async_req=params.get('async_req'), + _return_http_data_only=params.get('_return_http_data_only'), + _preload_content=params.get('_preload_content', True), + _request_timeout=params.get('_request_timeout'), + collection_formats=collection_formats) \ No newline at end of file diff --git a/src/conductor/client/http/models/workflow_state_update.py b/src/conductor/client/http/models/workflow_state_update.py new file mode 100644 index 00000000..9769a7de --- /dev/null +++ b/src/conductor/client/http/models/workflow_state_update.py @@ -0,0 +1,153 @@ +import pprint +import re # noqa: F401 + +import six + +from conductor.client.http.models import TaskResult + + +class WorkflowStateUpdate(object): + """NOTE: This class is auto generated by the swagger code generator program. + + Do not edit the class manually. + """ + """ + Attributes: + swagger_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. + """ + swagger_types = { + 'task_reference_name': 'str', + 'task_result': 'TaskResult', + 'variables': 'dict(str, object)' + } + + attribute_map = { + 'task_reference_name': 'taskReferenceName', + 'task_result': 'taskResult', + 'variables': 'variables' + } + + def __init__(self, task_reference_name: str = None, task_result: TaskResult = None, + variables: dict[str, object] = None): # noqa: E501 + """WorkflowStateUpdate - a model defined in Swagger""" # noqa: E501 + self._task_reference_name = None + self._task_result = None + self._variables = None + if task_reference_name is not None: + self.task_reference_name = task_reference_name + if task_result is not None: + self.task_result = task_result + if variables is not None: + self.variables = variables + + @property + def task_reference_name(self) -> str: + """Gets the task_reference_name of this WorkflowStateUpdate. # noqa: E501 + + + :return: The task_reference_name of this WorkflowStateUpdate. # noqa: E501 + :rtype: str + """ + return self._task_reference_name + + @task_reference_name.setter + def task_reference_name(self, task_reference_name: str): + """Sets the task_reference_name of this WorkflowStateUpdate. + + + :param task_reference_name: The task_reference_name of this WorkflowStateUpdate. # noqa: E501 + :type: str + """ + + self._task_reference_name = task_reference_name + + @property + def task_result(self) -> TaskResult: + """Gets the task_result of this WorkflowStateUpdate. # noqa: E501 + + + :return: The task_result of this WorkflowStateUpdate. # noqa: E501 + :rtype: TaskResult + """ + return self._task_result + + @task_result.setter + def task_result(self, task_result: TaskResult): + """Sets the task_result of this WorkflowStateUpdate. + + + :param task_result: The task_result of this WorkflowStateUpdate. # noqa: E501 + :type: TaskResult + """ + + self._task_result = task_result + + @property + def variables(self) -> dict[str, object]: + """Gets the variables of this WorkflowStateUpdate. # noqa: E501 + + + :return: The variables of this WorkflowStateUpdate. # noqa: E501 + :rtype: dict(str, object) + """ + return self._variables + + @variables.setter + def variables(self, variables: dict[str, object]): + """Sets the variables of this WorkflowStateUpdate. + + + :param variables: The variables of this WorkflowStateUpdate. # noqa: E501 + :type: dict(str, object) + """ + + self._variables = variables + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.swagger_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + if issubclass(WorkflowStateUpdate, dict): + for key, value in self.items(): + result[key] = value + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, WorkflowStateUpdate): + return False + + return self.__dict__ == other.__dict__ + + def __ne__(self, other): + """Returns true if both objects are not equal""" + return not self == other diff --git a/src/conductor/client/orkes/orkes_workflow_client.py b/src/conductor/client/orkes/orkes_workflow_client.py index b0d80c3b..475cfbde 100644 --- a/src/conductor/client/orkes/orkes_workflow_client.py +++ b/src/conductor/client/orkes/orkes_workflow_client.py @@ -8,6 +8,7 @@ from conductor.client.http.models.start_workflow_request import StartWorkflowRequest from conductor.client.http.models.workflow import Workflow from conductor.client.http.models.workflow_run import WorkflowRun +from conductor.client.http.models.workflow_state_update import WorkflowStateUpdate from conductor.client.http.models.workflow_test_request import WorkflowTestRequest from conductor.client.orkes.orkes_base_client import OrkesBaseClient from conductor.client.workflow_client import WorkflowClient @@ -44,7 +45,7 @@ def start_workflow(self, start_workflow_request: StartWorkflowRequest) -> str: def execute_workflow( self, start_workflow_request: StartWorkflowRequest, - request_id: str, + request_id: str = None, wait_until_task_ref: Optional[str] = None, wait_for_seconds: int = 30 ) -> WorkflowRun: @@ -168,3 +169,13 @@ def remove_workflow(self, workflow_id: str): def update_variables(self, workflow_id: str, variables: dict[str, object] = {}) -> None: self.workflowResourceApi.update_workflow_state(variables, workflow_id) + + def update_state(self, workflow_id: str, update_requesst: WorkflowStateUpdate, + wait_until_task_ref_names: List[str] = None, wait_for_seconds: int = None) -> WorkflowRun: + kwargs = {} + if wait_until_task_ref_names is not None: + kwargs['wait_until_task_ref'] = ','.join(wait_until_task_ref_names) + if wait_for_seconds is not None: + kwargs['wait_for_seconds'] = wait_for_seconds + + return self.workflowResourceApi.update_workflow_and_task_state(update_requesst=update_requesst, workflow_id=workflow_id, **kwargs) \ No newline at end of file diff --git a/src/conductor/client/workflow/task/human_task.py b/src/conductor/client/workflow/task/human_task.py index 1ef98d38..43c0f68d 100644 --- a/src/conductor/client/workflow/task/human_task.py +++ b/src/conductor/client/workflow/task/human_task.py @@ -1,12 +1,46 @@ +from enum import Enum + from typing_extensions import Self from conductor.client.workflow.task.task import TaskInterface from conductor.client.workflow.task.task_type import TaskType +class AssignmentCompletionStrategy(str, Enum): + LEAVE_OPEN = "LEAVE_OPEN", + TERMINATE = "TERMINATE" + + def __str__(self) -> str: + return self.name.__str__() + + +class TriggerType(str, Enum): + ASSIGNED = "ASSIGNED", + PENDING = "PENDING", + IN_PROGRESS = "IN_PROGRESS", + COMPLETED = "COMPLETED", + TIMED_OUT = "TIMED_OUT", + ASSIGNEE_CHANGED = "ASSIGNEE_CHANGED", + + def __str__(self) -> str: + return self.name.__str__() + + class HumanTask(TaskInterface): - def __init__(self, task_ref_name: str) -> Self: + def __init__(self, task_ref_name: str, + display_name : str = None, + form_template: str = None, form_version : int = 0, + assignment_completion_strategy : AssignmentCompletionStrategy = AssignmentCompletionStrategy.LEAVE_OPEN, + ) -> Self: super().__init__( task_reference_name=task_ref_name, task_type=TaskType.HUMAN ) + self.input_parameters.update({ + 'assignmentCompletionStrategy': assignment_completion_strategy.name, + 'displayName': display_name, + 'userFormTemplate': { + 'name': form_template, + 'version': form_version + } + }) diff --git a/src/conductor/client/workflow_client.py b/src/conductor/client/workflow_client.py index a78b4696..08884250 100644 --- a/src/conductor/client/workflow_client.py +++ b/src/conductor/client/workflow_client.py @@ -7,6 +7,7 @@ from conductor.client.http.models.rerun_workflow_request import RerunWorkflowRequest from conductor.client.http.models.start_workflow_request import StartWorkflowRequest from conductor.client.http.models.workflow import Workflow +from conductor.client.http.models.workflow_state_update import WorkflowStateUpdate from conductor.client.http.models.workflow_test_request import WorkflowTestRequest @@ -37,7 +38,7 @@ def terminate_workflow(self, workflow_id: str, reason: Optional[str] = None, def execute_workflow( self, start_workflow_request: StartWorkflowRequest, - request_id: str, + request_id: str = None, wait_until_task_ref: Optional[str] = None, wait_for_seconds: int = 30 ) -> WorkflowRun: @@ -101,3 +102,8 @@ def remove_workflow(self, workflow_id: str): @abstractmethod def update_variables(self, workflow_id: str, variables: dict[str, object] = {}) -> None: pass + + @abstractmethod + def update_state(self, workflow_id: str, update_requesst: WorkflowStateUpdate, + wait_until_task_ref_names: List[str] = None, wait_for_seconds : int = None) -> WorkflowRun: + pass