-
Notifications
You must be signed in to change notification settings - Fork 29
/
workflow_client.py
109 lines (88 loc) · 3.64 KB
/
workflow_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from abc import ABC, abstractmethod
from typing import Optional, List
from conductor.client.http.models import WorkflowRun, SkipTaskRequest, WorkflowStatus, \
ScrollableSearchResultWorkflowSummary
from conductor.client.http.models.correlation_ids_search_request import CorrelationIdsSearchRequest
from conductor.client.http.models.rerun_workflow_request import RerunWorkflowRequest
from conductor.client.http.models.start_workflow_request import StartWorkflowRequest
from conductor.client.http.models.workflow import Workflow
from conductor.client.http.models.workflow_state_update import WorkflowStateUpdate
from conductor.client.http.models.workflow_test_request import WorkflowTestRequest
class WorkflowClient(ABC):
@abstractmethod
def start_workflow(self, start_workflow_request: StartWorkflowRequest) -> str:
pass
@abstractmethod
def get_workflow(self, workflow_id: str, include_tasks: Optional[bool] = True) -> Workflow:
pass
@abstractmethod
def get_workflow_status(self, workflow_id: str, include_output: bool = None,
include_variables: bool = None) -> WorkflowStatus:
pass
@abstractmethod
def delete_workflow(self, workflow_id: str, archive_workflow: Optional[bool] = True):
pass
@abstractmethod
def terminate_workflow(self, workflow_id: str, reason: Optional[str] = None,
trigger_failure_workflow: bool = False):
pass
@abstractmethod
def execute_workflow(
self,
start_workflow_request: StartWorkflowRequest,
request_id: str = None,
wait_until_task_ref: Optional[str] = None,
wait_for_seconds: int = 30
) -> WorkflowRun:
pass
@abstractmethod
def pause_workflow(self, workflow_id: str):
pass
@abstractmethod
def resume_workflow(self, workflow_id: str):
pass
@abstractmethod
def restart_workflow(self, workflow_id: str, use_latest_def: Optional[bool] = False):
pass
@abstractmethod
def retry_workflow(self, workflow_id: str, resume_subworkflow_tasks: Optional[bool] = False):
pass
@abstractmethod
def rerun_workflow(self, workflow_id: str, rerun_workflow_request: RerunWorkflowRequest):
pass
@abstractmethod
def skip_task_from_workflow(self, workflow_id: str, task_reference_name: str, request: SkipTaskRequest):
pass
@abstractmethod
def test_workflow(self, test_request: WorkflowTestRequest) -> Workflow:
pass
@abstractmethod
def search(self, start: int = 0, size: int = 100, free_text: str = '*',
query: str = None) -> ScrollableSearchResultWorkflowSummary:
pass
@abstractmethod
def get_by_correlation_ids_in_batch(
self,
batch_request: CorrelationIdsSearchRequest,
include_completed: bool = False,
include_tasks: bool = False) -> dict[str, List[Workflow]]:
pass
@abstractmethod
def get_by_correlation_ids(
self,
workflow_name: str,
correlation_ids: List[str],
include_completed: bool = False,
include_tasks: bool = False
) -> dict[str, List[Workflow]]:
pass
@abstractmethod
def remove_workflow(self, workflow_id: str):
pass
@abstractmethod
def update_variables(self, workflow_id: str, variables: dict[str, object] = {}) -> None:
pass
@abstractmethod
def update_state(self, workflow_id: str, update_requesst: WorkflowStateUpdate,
wait_until_task_ref_names: List[str] = None, wait_for_seconds : int = None) -> WorkflowRun:
pass