forked from parallelworks/pw-cluster-automation
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.py
executable file
·136 lines (117 loc) · 4.25 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import requests
import json
import pprint as pp
import base64
class Client():
def __init__(self, url, key):
self.url = url
self.api = url+'/api'
self.key = key
self.session = requests.Session()
self.headers = {
'Content-Type': 'application/json',
'Authorization': 'Basic ' + base64.b64encode(bytes(self.key, 'utf-8')).decode('utf-8')
}
def get_resources(self):
req = self.session.get(self.api + "/resources", headers = self.headers)
req.raise_for_status()
data = json.loads(req.text)
return data
def get_resource(self, name):
req = self.session.get(self.api + "/resources", headers = self.headers)
req.raise_for_status()
data = json.loads(req.text)
resource = [x for x in data if x['name'].lower() == name.lower()]
return resource
def delete_resource(self, id: str):
req = self.session.delete(
self.api + "/v2/resources/{}".format(id),
headers = self.headers
)
req.raise_for_status()
return req.text
def create_v2_cluster(self, name: str, description: str, tags: str, type: str):
if type != 'pclusterv2' and type != 'gclusterv2' and type != 'azclusterv2':
raise Exception("Invalid cluster type")
url = self.api + "/v2/resources"
payload = {
'name': name,
'description': description,
'tags': tags,
'type': type,
'params': {
"jobsPerNode": ""
}
}
req = self.session.post(url, data=(payload), headers = self.headers)
req.raise_for_status()
data = json.loads(req.text)
return data
def update_v2_cluster(self, id: str, cluster_definition):
if id is None or id == "":
raise Exception("Invalid cluster id")
url = self.api + "/v2/resources/{}".format(id)
req = self.session.put(url, json = cluster_definition, headers = self.headers)
req.raise_for_status()
data = json.loads(req.text)
return data
def start_resource(self, id: str):
req = self.session.get(
self.api + "/resources/start",
params = {'id': id},
headers = self.headers
)
req.raise_for_status()
return req.text
def stop_resource(self, id):
req = self.session.get(
self.api + "/resources/stop",
params = {'id': id},
headers = self.headers
)
req.raise_for_status()
return req.text
def update_resource(self, name, params):
# FIXME: Update headers
update = "&name={}".format(name)
for key, value in params.items():
update = "{}&{}={}".format(update, key, value)
req = self.session.post(
self.api + "/resources/set?key=" + self.key + update)
req.raise_for_status()
return req.text
def get_identity(self):
url = self.api + "/v2/auth/session"
req = self.session.get(url, headers = self.headers)
req.raise_for_status()
data = json.loads(req.text)
return data
def run_workflow(self, name, inputs):
url = self.api + "/v2/workflows/" + name + "/start"
payload = {
'variables': inputs
}
req = self.session.post(url, json=payload, headers = self.headers)
req.raise_for_status()
data = json.loads(req.text)
return data
def get_latest_job_status(self, workflow_name):
url = self.api + "/v2/workflows/" + workflow_name + "/jobs/0"
req = self.session.get(url, headers = self.headers)
req.raise_for_status()
data = json.loads(req.text)
return data
def get_storages(self):
req = self.session.get(self.api + "/storage", headers = self.headers)
req.raise_for_status()
data = json.loads(req.text)
return data
def get_bucket_cred(self, id: str):
url = self.api + "/v2/vault/getBucketToken"
payload = {
'bucketID': id
}
req = self.session.post(url, json=payload, headers = self.headers)
req.raise_for_status()
data = json.loads(req.text)
return data