File: cronjob_crud.py

package info (click to toggle)
python-kubernetes 30.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 39,984 kB
  • sloc: python: 126,462; sh: 699; makefile: 46
file content (128 lines) | stat: -rw-r--r-- 4,097 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/python3
# -*- coding:utf-8 -*-

import json
import time

from kubernetes import client, config

config.load_kube_config()


def create_namespaced_cron_job(namespace='default', body=None):
    cronjob_json = body
    if body is None:
        print('body is required!')
        exit(0)
    name = body['metadata']['name']
    if judge_crontab_exists(namespace, name):
        print(f'{name} exists, please do not repeat!')
    else:
        v1 = client.BatchV1Api()
        ret = v1.create_namespaced_cron_job(namespace=namespace, body=cronjob_json, pretty=True,
                                            _preload_content=False, async_req=False)
        ret_dict = json.loads(ret.data)
        print(f'create succeed\n{json.dumps(ret_dict)}')


def delete_namespaced_cron_job(namespace='default', name=None):
    if name is None:
        print('name is required!')
        exit(0)
    if not judge_crontab_exists(namespace, name):
        print(f"{name} doesn't exists, please enter a new one!")
    else:
        v1 = client.BatchV1Api()
        ret = v1.delete_namespaced_cron_job(name=name, namespace=namespace, _preload_content=False, async_req=False)
        ret_dict = json.loads(ret.data)
        print(f'delete succeed\n{json.dumps(ret_dict)}')


def patch_namespaced_cron_job(namespace='default', body=None):
    cronjob_json = body
    if body is None:
        print('body is required!')
        exit(0)
    name = body['metadata']['name']
    if judge_crontab_exists(namespace, name):
        v1 = client.BatchV1Api()
        ret = v1.patch_namespaced_cron_job(name=name, namespace=namespace, body=cronjob_json,
                                           _preload_content=False, async_req=False)
        ret_dict = json.loads(ret.data)
        print(f'patch succeed\n{json.dumps(ret_dict)}')
    else:
        print(f"{name} doesn't exists, please enter a new one!")


def get_cronjob_list(namespace='default'):
    v1 = client.BatchV1Api()
    ret = v1.list_namespaced_cron_job(namespace=namespace, pretty=True, _preload_content=False)
    cron_job_list = json.loads(ret.data)
    print(f'cronjob number={len(cron_job_list["items"])}')
    return cron_job_list["items"]


def judge_crontab_exists(namespace, name):
    cron_job_list = get_cronjob_list(namespace)
    for cron_job in cron_job_list:
        if name == cron_job['metadata']['name']:
            return True
    return False


def get_cronjob_body(namespace, name, command):
    body = {
        "apiVersion": "batch/v1",
        "kind": "CronJob",
        "metadata": {
            "name": name,
            "namespace": namespace
        },
        "spec": {
            "schedule": "*/1 * * * *",
            "concurrencyPolicy": "Allow",
            "suspend": False,
            "jobTemplate": {
                "spec": {
                    "template": {
                        "spec": {
                            "containers": [
                                {
                                    "name": name,
                                    "image": "busybox:1.35",
                                    "command": command
                                }
                            ],
                            "restartPolicy": "Never"
                        }
                    }
                }
            },
            "successfulJobsHistoryLimit": 3,
            "failedJobsHistoryLimit": 1
        }
    }
    return body


if __name__ == '__main__':
    # get
    cronjob_list = get_cronjob_list()

    # delete
    delete_namespaced_cron_job('default', 'hostname')
    time.sleep(2)

    # create
    container_command = [
        "/bin/sh",
        "-c",
        "date; echo Hello from the Kubernetes cluster; hostname"
    ]
    hostname_json = get_cronjob_body('default', 'hostname', container_command)
    create_namespaced_cron_job('default', hostname_json)

    # update
    container_command[2] = "date; echo this is patch; hostname"
    hostname_json = get_cronjob_body('default', 'hostname', container_command)
    patch_namespaced_cron_job('default', hostname_json)