1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Creates, updates, and deletes a job object.
"""
from os import path
from time import sleep
import yaml
from kubernetes import client, config
JOB_NAME = "pi"
def create_job_object():
# Configure Pod template container
container = client.V1Container(
name="pi",
image="perl",
command=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"])
# Create and configure a spec section
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "pi"}),
spec=client.V1PodSpec(restart_policy="Never", containers=[container]))
# Create the specification of deployment
spec = client.V1JobSpec(
template=template,
backoff_limit=4)
# Instantiate the job object
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(name=JOB_NAME),
spec=spec)
return job
def create_job(api_instance, job):
api_response = api_instance.create_namespaced_job(
body=job,
namespace="default")
print(f"Job created. status='{str(api_response.status)}'")
get_job_status(api_instance)
def get_job_status(api_instance):
job_completed = False
while not job_completed:
api_response = api_instance.read_namespaced_job_status(
name=JOB_NAME,
namespace="default")
if api_response.status.succeeded is not None or \
api_response.status.failed is not None:
job_completed = True
sleep(1)
print(f"Job status='{str(api_response.status)}'")
def update_job(api_instance, job):
# Update container image
job.spec.template.spec.containers[0].image = "perl"
api_response = api_instance.patch_namespaced_job(
name=JOB_NAME,
namespace="default",
body=job)
print(f"Job updated. status='{str(api_response.status)}'")
def delete_job(api_instance):
api_response = api_instance.delete_namespaced_job(
name=JOB_NAME,
namespace="default",
body=client.V1DeleteOptions(
propagation_policy='Foreground',
grace_period_seconds=5))
print(f"Job deleted. status='{str(api_response.status)}'")
def main():
# Configs can be set in Configuration class directly or using helper
# utility. If no argument provided, the config will be loaded from
# default location.
config.load_kube_config()
batch_v1 = client.BatchV1Api()
# Create a job object with client-python API. The job we
# created is same as the `pi-job.yaml` in the /examples folder.
job = create_job_object()
create_job(batch_v1, job)
update_job(batch_v1, job)
delete_job(batch_v1)
if __name__ == '__main__':
main()
|