-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
Copy pathjob_crud.py
112 lines (89 loc) · 3.27 KB
/
job_crud.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Creates, updates, and deletes a job object.
"""
from os import path
from time import sleep
import yaml
from kubernetes import client, config
JOB_NAME = "pi"
def create_job_object():
# Configure Pod template container
container = client.V1Container(
name="pi",
image="perl",
command=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"])
# Create and configure a spec section
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "pi"}),
spec=client.V1PodSpec(restart_policy="Never", containers=[container]))
# Create the specification of deployment
spec = client.V1JobSpec(
template=template,
backoff_limit=4)
# Instantiate the job object
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(name=JOB_NAME),
spec=spec)
return job
def create_job(api_instance, job):
api_response = api_instance.create_namespaced_job(
body=job,
namespace="default")
print(f"Job created. status='{str(api_response.status)}'")
get_job_status(api_instance)
def get_job_status(api_instance):
job_completed = False
while not job_completed:
api_response = api_instance.read_namespaced_job_status(
name=JOB_NAME,
namespace="default")
if api_response.status.succeeded is not None or \
api_response.status.failed is not None:
job_completed = True
sleep(1)
print(f"Job status='{str(api_response.status)}'")
def update_job(api_instance, job):
# Update container image
job.spec.template.spec.containers[0].image = "perl"
api_response = api_instance.patch_namespaced_job(
name=JOB_NAME,
namespace="default",
body=job)
print(f"Job updated. status='{str(api_response.status)}'")
def delete_job(api_instance):
api_response = api_instance.delete_namespaced_job(
name=JOB_NAME,
namespace="default",
body=client.V1DeleteOptions(
propagation_policy='Foreground',
grace_period_seconds=5))
print(f"Job deleted. status='{str(api_response.status)}'")
def main():
# Configs can be set in Configuration class directly or using helper
# utility. If no argument provided, the config will be loaded from
# default location.
config.load_kube_config()
batch_v1 = client.BatchV1Api()
# Create a job object with client-python API. The job we
# created is same as the `pi-job.yaml` in the /examples folder.
job = create_job_object()
create_job(batch_v1, job)
update_job(batch_v1, job)
delete_job(batch_v1)
if __name__ == '__main__':
main()