Skip to content

Commit

Permalink
Merge pull request #164 from conductor-sdk/sync_task_update_api
Browse files Browse the repository at this point in the history
update task sync
  • Loading branch information
gardusig authored May 10, 2023
2 parents 365809f + a7bb98d commit 41c035b
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 9 deletions.
127 changes: 127 additions & 0 deletions src/conductor/client/http/api/task_resource_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1595,3 +1595,130 @@ def update_task1_with_http_info(self, body, workflow_id, task_ref_name, status,
_preload_content=params.get('_preload_content', True),
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)

def update_task_sync(self, body, workflow_id, task_ref_name, status, **kwargs): # noqa: E501
"""Update a task By Ref Name # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.update_task_sync(body, workflow_id, task_ref_name, status, async_req=True)
>>> result = thread.get()
:param async_req bool
:param dict(str, object) body: (required)
:param str workflow_id: (required)
:param str task_ref_name: (required)
:param str status: (required)
:param str workerid:
:return: Workflow
If the method is called asynchronously,
returns the request thread.
"""
kwargs['_return_http_data_only'] = True
if kwargs.get('async_req'):
return self.update_task_sync_with_http_info(body, workflow_id, task_ref_name, status, **kwargs) # noqa: E501
else:
(data) = self.update_task_sync_with_http_info(body, workflow_id, task_ref_name, status, **kwargs) # noqa: E501
return data

def update_task_sync_with_http_info(self, body, workflow_id, task_ref_name, status, **kwargs): # noqa: E501
"""Update a task By Ref Name # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.update_task_sync_with_http_info(body, workflow_id, task_ref_name, status, async_req=True)
>>> result = thread.get()
:param async_req bool
:param dict(str, object) body: (required)
:param str workflow_id: (required)
:param str task_ref_name: (required)
:param str status: (required)
:param str workerid:
:return: Workflow
If the method is called asynchronously,
returns the request thread.
"""

all_params = ['body', 'workflow_id', 'task_ref_name', 'status'] # noqa: E501
all_params.append('async_req')
all_params.append('_return_http_data_only')
all_params.append('_preload_content')
all_params.append('_request_timeout')

params = locals()
for key, val in six.iteritems(params['kwargs']):
if key not in all_params:
raise TypeError(
"Got an unexpected keyword argument '%s'"
" to method update_task1" % key
)
params[key] = val
del params['kwargs']
# verify the required parameter 'body' is set
if ('body' not in params or
params['body'] is None):
raise ValueError("Missing the required parameter `body` when calling `update_task1`") # noqa: E501
# verify the required parameter 'workflow_id' is set
if ('workflow_id' not in params or
params['workflow_id'] is None):
raise ValueError("Missing the required parameter `workflow_id` when calling `update_task1`") # noqa: E501
# verify the required parameter 'task_ref_name' is set
if ('task_ref_name' not in params or
params['task_ref_name'] is None):
raise ValueError("Missing the required parameter `task_ref_name` when calling `update_task1`") # noqa: E501
# verify the required parameter 'status' is set
if ('status' not in params or
params['status'] is None):
raise ValueError("Missing the required parameter `status` when calling `update_task1`") # noqa: E501

collection_formats = {}

path_params = {}
if 'workflow_id' in params:
path_params['workflowId'] = params['workflow_id'] # noqa: E501
if 'task_ref_name' in params:
path_params['taskRefName'] = params['task_ref_name'] # noqa: E501
if 'status' in params:
path_params['status'] = params['status'] # noqa: E501

query_params = []

if 'workerid' not in params:
params['workerid'] = socket.gethostname()
query_params.append(('workerid', params['workerid'])) # noqa: E501

header_params = {}

form_params = []
local_var_files = {}

body_params = None
if 'body' in params:
body_params = params['body']
# HTTP header `Accept`
header_params['Accept'] = self.api_client.select_header_accept(
['text/plain']) # noqa: E501

# HTTP header `Content-Type`
header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501
['application/json']) # noqa: E501

# Authentication setting
auth_settings = [] # noqa: E501

return self.api_client.call_api(
'/tasks/{workflowId}/{taskRefName}/{status}/sync', 'POST',
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type='Workflow', # noqa: E501
auth_settings=auth_settings,
async_req=params.get('async_req'),
_return_http_data_only=params.get('_return_http_data_only'),
_preload_content=params.get('_preload_content', True),
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)
9 changes: 9 additions & 0 deletions src/conductor/client/workflow/executor/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,15 @@ def update_task_by_ref_name(self, task_output: Dict[str, Any], workflow_id: str,
status=status,
)

def update_task_by_ref_name_sync(self, task_output: Dict[str, Any], workflow_id: str, task_reference_name: str, status: str) -> Workflow:
"""Update a task By Ref Name"""
return self.task_client.update_task_sync(
body=task_output,
workflow_id=workflow_id,
task_ref_name=task_reference_name,
status=status,
)

def get_task(self, task_id: str) -> str:
"""Get task by Id"""
return self.task_client.get_task(
Expand Down
59 changes: 50 additions & 9 deletions tests/integration/workflow/test_workflow_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,13 @@ def test_workflow_methods(
workflow_executor: WorkflowExecutor,
workflow_quantity: int,
) -> None:
task = SimpleTask(
'python_integration_test_abc1asjdkajskdjsad',
'python_integration_test_abc1asjdkajskdjsad'
)
if workflow_quantity < 1:
return
task_name = f'python_integration_test_task{uuid.uuid4()}'
task = SimpleTask(task_name, task_name)
workflow_executor.metadata_client.register_task_def(
[task.to_workflow_task()]
)
workflow_name = 'python_integration_test_abc1asjdk'
[task.to_workflow_task()])
workflow_name = f'python_integration_test_wf_{uuid.uuid4()}'
workflow = ConductorWorkflow(
executor=workflow_executor,
name=workflow_name,
Expand All @@ -129,6 +128,24 @@ def test_workflow_methods(
workflow.to_workflow_def(),
overwrite=True,
)

workflow_id_async = workflow_executor.start_workflow(
StartWorkflowRequest(name=workflow_name))
__update_task_by_ref_name(
workflow_executor,
workflow_id_async,
task_name
)

workflow_id_sync = workflow_executor.start_workflow(
StartWorkflowRequest(name=workflow_name))
response = __update_task_by_ref_name_sync(
workflow_executor,
workflow_id_sync,
task_name
)
print(f'workflowId: {workflow_id_sync}, response: {response}')

start_workflow_requests = [''] * workflow_quantity
for i in range(workflow_quantity):
start_workflow_requests[i] = StartWorkflowRequest(name=workflow_name)
Expand Down Expand Up @@ -385,11 +402,35 @@ def __validate_rerun_workflow(workflow_executor: WorkflowExecutor, workflow_id:
)


def _run_with_retry_attempt(f, params, retries=5) -> None:
def _run_with_retry_attempt(f, params, retries=3) -> None:
for attempt in range(retries):
try:
return f(**params)
except Exception as e:
if attempt == retries - 1:
raise e
sleep((attempt + 1) * 10)
sleep(1 << attempt)


def __update_task_by_ref_name(workflow_executor: WorkflowExecutor, workflow_id: str, task_name: str):
return _run_with_retry_attempt(
workflow_executor.update_task_by_ref_name,
params={
'task_output': {},
'workflow_id': workflow_id,
'task_reference_name': task_name,
'status': 'COMPLETED'
}
)


def __update_task_by_ref_name_sync(workflow_executor: WorkflowExecutor, workflow_id: str, task_name: str):
return _run_with_retry_attempt(
workflow_executor.update_task_by_ref_name_sync,
params={
'task_output': {},
'workflow_id': workflow_id,
'task_reference_name': task_name,
'status': 'COMPLETED'
}
)

0 comments on commit 41c035b

Please sign in to comment.