diff --git a/src/conductor/client/http/api/workflow_bulk_resource_api.py b/src/conductor/client/http/api/workflow_bulk_resource_api.py index 02674e13..fa6e9022 100644 --- a/src/conductor/client/http/api/workflow_bulk_resource_api.py +++ b/src/conductor/client/http/api/workflow_bulk_resource_api.py @@ -423,6 +423,7 @@ def terminate(self, body, **kwargs): # noqa: E501 :param async_req bool :param list[str] body: (required) :param str reason: + :param bool trigger_failure_workflow: :return: BulkResponse If the method is called asynchronously, returns the request thread. @@ -445,12 +446,13 @@ def terminate_with_http_info(self, body, **kwargs): # noqa: E501 :param async_req bool :param list[str] body: (required) :param str reason: + :param bool trigger_failure_workflow: :return: BulkResponse If the method is called asynchronously, returns the request thread. """ - all_params = ['body', 'reason'] # noqa: E501 + all_params = ['body', 'reason', 'triggerFailureWorkflow'] # noqa: E501 all_params.append('async_req') all_params.append('_return_http_data_only') all_params.append('_preload_content') @@ -478,6 +480,9 @@ def terminate_with_http_info(self, body, **kwargs): # noqa: E501 if 'reason' in params: query_params.append(('reason', params['reason'])) # noqa: E501 + if 'triggerFailureWorkflow' in params: + query_params.append(('triggerFailureWorkflow', params['triggerFailureWorkflow'])) # noqa: E501 + header_params = {} form_params = [] diff --git a/src/conductor/client/http/api/workflow_resource_api.py b/src/conductor/client/http/api/workflow_resource_api.py index 31bc927c..25f033fd 100644 --- a/src/conductor/client/http/api/workflow_resource_api.py +++ b/src/conductor/client/http/api/workflow_resource_api.py @@ -2390,6 +2390,7 @@ def terminate1(self, workflow_id, **kwargs): # noqa: E501 :param async_req bool :param str workflow_id: (required) :param str reason: + :param bool trigger_failure_workflow: :return: None If the method is called asynchronously, returns the request thread. @@ -2412,12 +2413,13 @@ def terminate1_with_http_info(self, workflow_id, **kwargs): # noqa: E501 :param async_req bool :param str workflow_id: (required) :param str reason: + :param bool trigger_failure_workflow: :return: None If the method is called asynchronously, returns the request thread. """ - all_params = ['workflow_id', 'reason'] # noqa: E501 + all_params = ['workflow_id', 'reason', 'triggerFailureWorkflow'] # noqa: E501 all_params.append('async_req') all_params.append('_return_http_data_only') all_params.append('_preload_content') @@ -2447,6 +2449,9 @@ def terminate1_with_http_info(self, workflow_id, **kwargs): # noqa: E501 if 'reason' in params: query_params.append(('reason', params['reason'])) # noqa: E501 + if 'triggerFailureWorkflow' in params: + query_params.append(('triggerFailureWorkflow', params['triggerFailureWorkflow'])) # noqa: E501 + header_params = {} form_params = [] diff --git a/src/conductor/client/workflow/executor/workflow_executor.py b/src/conductor/client/workflow/executor/workflow_executor.py index 9e899dd9..4f4e56f4 100644 --- a/src/conductor/client/workflow/executor/workflow_executor.py +++ b/src/conductor/client/workflow/executor/workflow_executor.py @@ -152,11 +152,13 @@ def resume(self, workflow_id: str) -> None: workflow_id=workflow_id ) - def terminate(self, workflow_id: str, reason: str = None) -> None: + def terminate(self, workflow_id: str, reason: str = None, trigger_failure_workflow: bool = None) -> None: """Terminate workflow execution""" kwargs = {} if reason is not None: kwargs['reason'] = reason + if trigger_failure_workflow is not None: + kwargs['triggerFailureWorkflow'] = trigger_failure_workflow return self.workflow_client.terminate1( workflow_id=workflow_id, **kwargs diff --git a/tests/integration/workflow/test_workflow_execution.py b/tests/integration/workflow/test_workflow_execution.py index a7bd9f08..947c72c7 100644 --- a/tests/integration/workflow/test_workflow_execution.py +++ b/tests/integration/workflow/test_workflow_execution.py @@ -124,7 +124,7 @@ def test_workflow_methods( version=1234, ).add( task - ) + ).failure_workflow(workflow_name) workflow_executor.register_workflow( workflow.to_workflow_def(), overwrite=True, @@ -142,11 +142,15 @@ def test_workflow_methods( _restart_workflow(workflow_executor, workflow_id) _terminate_workflow(workflow_executor, workflow_id) _retry_workflow(workflow_executor, workflow_id) - _terminate_workflow(workflow_executor, workflow_id) + failure_wf_id = _terminate_workflow_with_failure(workflow_executor, workflow_id, True) + _terminate_workflow(workflow_executor, failure_wf_id) _rerun_workflow(workflow_executor, workflow_id) workflow_executor.remove_workflow( workflow_id, archive_workflow=False ) + workflow_executor.remove_workflow( + failure_wf_id, archive_workflow=False + ) def test_workflow_registration(workflow_executor: WorkflowExecutor): @@ -266,6 +270,18 @@ def _terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) - f'workflow expected to be TERMINATED, but received {workflow_status.status}, workflow_id: {workflow_id}' ) +def _terminate_workflow_with_failure(workflow_executor: WorkflowExecutor, workflow_id: str, trigger_failure_workflow: bool) -> str: + workflow_executor.terminate(workflow_id, 'test', trigger_failure_workflow) + workflow_status = workflow_executor.get_workflow_status( + workflow_id, + include_output=True, + include_variables=False, + ) + if workflow_status.status != 'TERMINATED': + raise Exception( + f'workflow expected to be TERMINATED, but received {workflow_status.status}, workflow_id: {workflow_id}' + ) + return workflow_status.output.get('conductor.failure_workflow') def _restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: workflow_executor.restart(workflow_id)