Skip to content

Commit

Permalink
Merge pull request #149 from conductor-sdk/orkes-conudctor-957-termin…
Browse files Browse the repository at this point in the history
…ate-API-update-triggerFailureWorkflow

orkes-conductor-957: update Terminate API
  • Loading branch information
gardusig authored Feb 11, 2023
2 parents d41a2aa + a7e09a9 commit 8204fed
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 5 deletions.
7 changes: 6 additions & 1 deletion src/conductor/client/http/api/workflow_bulk_resource_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ def terminate(self, body, **kwargs): # noqa: E501
:param async_req bool
:param list[str] body: (required)
:param str reason:
:param bool trigger_failure_workflow:
:return: BulkResponse
If the method is called asynchronously,
returns the request thread.
Expand All @@ -445,12 +446,13 @@ def terminate_with_http_info(self, body, **kwargs): # noqa: E501
:param async_req bool
:param list[str] body: (required)
:param str reason:
:param bool trigger_failure_workflow:
:return: BulkResponse
If the method is called asynchronously,
returns the request thread.
"""

all_params = ['body', 'reason'] # noqa: E501
all_params = ['body', 'reason', 'triggerFailureWorkflow'] # noqa: E501
all_params.append('async_req')
all_params.append('_return_http_data_only')
all_params.append('_preload_content')
Expand Down Expand Up @@ -478,6 +480,9 @@ def terminate_with_http_info(self, body, **kwargs): # noqa: E501
if 'reason' in params:
query_params.append(('reason', params['reason'])) # noqa: E501

if 'triggerFailureWorkflow' in params:
query_params.append(('triggerFailureWorkflow', params['triggerFailureWorkflow'])) # noqa: E501

header_params = {}

form_params = []
Expand Down
7 changes: 6 additions & 1 deletion src/conductor/client/http/api/workflow_resource_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2390,6 +2390,7 @@ def terminate1(self, workflow_id, **kwargs): # noqa: E501
:param async_req bool
:param str workflow_id: (required)
:param str reason:
:param bool trigger_failure_workflow:
:return: None
If the method is called asynchronously,
returns the request thread.
Expand All @@ -2412,12 +2413,13 @@ def terminate1_with_http_info(self, workflow_id, **kwargs): # noqa: E501
:param async_req bool
:param str workflow_id: (required)
:param str reason:
:param bool trigger_failure_workflow:
:return: None
If the method is called asynchronously,
returns the request thread.
"""

all_params = ['workflow_id', 'reason'] # noqa: E501
all_params = ['workflow_id', 'reason', 'triggerFailureWorkflow'] # noqa: E501
all_params.append('async_req')
all_params.append('_return_http_data_only')
all_params.append('_preload_content')
Expand Down Expand Up @@ -2447,6 +2449,9 @@ def terminate1_with_http_info(self, workflow_id, **kwargs): # noqa: E501
if 'reason' in params:
query_params.append(('reason', params['reason'])) # noqa: E501

if 'triggerFailureWorkflow' in params:
query_params.append(('triggerFailureWorkflow', params['triggerFailureWorkflow'])) # noqa: E501

header_params = {}

form_params = []
Expand Down
4 changes: 3 additions & 1 deletion src/conductor/client/workflow/executor/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,13 @@ def resume(self, workflow_id: str) -> None:
workflow_id=workflow_id
)

def terminate(self, workflow_id: str, reason: str = None) -> None:
def terminate(self, workflow_id: str, reason: str = None, trigger_failure_workflow: bool = None) -> None:
"""Terminate workflow execution"""
kwargs = {}
if reason is not None:
kwargs['reason'] = reason
if trigger_failure_workflow is not None:
kwargs['triggerFailureWorkflow'] = trigger_failure_workflow
return self.workflow_client.terminate1(
workflow_id=workflow_id,
**kwargs
Expand Down
20 changes: 18 additions & 2 deletions tests/integration/workflow/test_workflow_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def test_workflow_methods(
version=1234,
).add(
task
)
).failure_workflow(workflow_name)
workflow_executor.register_workflow(
workflow.to_workflow_def(),
overwrite=True,
Expand All @@ -142,11 +142,15 @@ def test_workflow_methods(
_restart_workflow(workflow_executor, workflow_id)
_terminate_workflow(workflow_executor, workflow_id)
_retry_workflow(workflow_executor, workflow_id)
_terminate_workflow(workflow_executor, workflow_id)
failure_wf_id = _terminate_workflow_with_failure(workflow_executor, workflow_id, True)
_terminate_workflow(workflow_executor, failure_wf_id)
_rerun_workflow(workflow_executor, workflow_id)
workflow_executor.remove_workflow(
workflow_id, archive_workflow=False
)
workflow_executor.remove_workflow(
failure_wf_id, archive_workflow=False
)


def test_workflow_registration(workflow_executor: WorkflowExecutor):
Expand Down Expand Up @@ -266,6 +270,18 @@ def _terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -
f'workflow expected to be TERMINATED, but received {workflow_status.status}, workflow_id: {workflow_id}'
)

def _terminate_workflow_with_failure(workflow_executor: WorkflowExecutor, workflow_id: str, trigger_failure_workflow: bool) -> str:
workflow_executor.terminate(workflow_id, 'test', trigger_failure_workflow)
workflow_status = workflow_executor.get_workflow_status(
workflow_id,
include_output=True,
include_variables=False,
)
if workflow_status.status != 'TERMINATED':
raise Exception(
f'workflow expected to be TERMINATED, but received {workflow_status.status}, workflow_id: {workflow_id}'
)
return workflow_status.output.get('conductor.failure_workflow')

def _restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
workflow_executor.restart(workflow_id)
Expand Down

0 comments on commit 8204fed

Please sign in to comment.