1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
#!/usr/bin/python3
# -*- coding:utf-8 -*-
import json
import time
from kubernetes import client, config
config.load_kube_config()
def create_namespaced_cron_job(namespace='default', body=None):
cronjob_json = body
if body is None:
print('body is required!')
exit(0)
name = body['metadata']['name']
if judge_crontab_exists(namespace, name):
print(f'{name} exists, please do not repeat!')
else:
v1 = client.BatchV1Api()
ret = v1.create_namespaced_cron_job(namespace=namespace, body=cronjob_json, pretty=True,
_preload_content=False, async_req=False)
ret_dict = json.loads(ret.data)
print(f'create succeed\n{json.dumps(ret_dict)}')
def delete_namespaced_cron_job(namespace='default', name=None):
if name is None:
print('name is required!')
exit(0)
if not judge_crontab_exists(namespace, name):
print(f"{name} doesn't exists, please enter a new one!")
else:
v1 = client.BatchV1Api()
ret = v1.delete_namespaced_cron_job(name=name, namespace=namespace, _preload_content=False, async_req=False)
ret_dict = json.loads(ret.data)
print(f'delete succeed\n{json.dumps(ret_dict)}')
def patch_namespaced_cron_job(namespace='default', body=None):
cronjob_json = body
if body is None:
print('body is required!')
exit(0)
name = body['metadata']['name']
if judge_crontab_exists(namespace, name):
v1 = client.BatchV1Api()
ret = v1.patch_namespaced_cron_job(name=name, namespace=namespace, body=cronjob_json,
_preload_content=False, async_req=False)
ret_dict = json.loads(ret.data)
print(f'patch succeed\n{json.dumps(ret_dict)}')
else:
print(f"{name} doesn't exists, please enter a new one!")
def get_cronjob_list(namespace='default'):
v1 = client.BatchV1Api()
ret = v1.list_namespaced_cron_job(namespace=namespace, pretty=True, _preload_content=False)
cron_job_list = json.loads(ret.data)
print(f'cronjob number={len(cron_job_list["items"])}')
return cron_job_list["items"]
def judge_crontab_exists(namespace, name):
cron_job_list = get_cronjob_list(namespace)
for cron_job in cron_job_list:
if name == cron_job['metadata']['name']:
return True
return False
def get_cronjob_body(namespace, name, command):
body = {
"apiVersion": "batch/v1",
"kind": "CronJob",
"metadata": {
"name": name,
"namespace": namespace
},
"spec": {
"schedule": "*/1 * * * *",
"concurrencyPolicy": "Allow",
"suspend": False,
"jobTemplate": {
"spec": {
"template": {
"spec": {
"containers": [
{
"name": name,
"image": "busybox:1.35",
"command": command
}
],
"restartPolicy": "Never"
}
}
}
},
"successfulJobsHistoryLimit": 3,
"failedJobsHistoryLimit": 1
}
}
return body
if __name__ == '__main__':
# get
cronjob_list = get_cronjob_list()
# delete
delete_namespaced_cron_job('default', 'hostname')
time.sleep(2)
# create
container_command = [
"/bin/sh",
"-c",
"date; echo Hello from the Kubernetes cluster; hostname"
]
hostname_json = get_cronjob_body('default', 'hostname', container_command)
create_namespaced_cron_job('default', hostname_json)
# update
container_command[2] = "date; echo this is patch; hostname"
hostname_json = get_cronjob_body('default', 'hostname', container_command)
patch_namespaced_cron_job('default', hostname_json)
|