File: test_delete.py

package info (click to toggle)
crun 1.24-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 9,736 kB
  • sloc: ansic: 66,212; python: 7,047; sh: 5,122; makefile: 768
file content (114 lines) | stat: -rwxr-xr-x 4,216 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/bin/env python3
# crun - OCI runtime written in C
#
# Copyright (C) 2017, 2018, 2019 Giuseppe Scrivano <giuseppe@scrivano.org>
# crun is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# crun is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with crun.  If not, see <http://www.gnu.org/licenses/>.

import json
import subprocess
import os
from tests_utils import *

def test_simple_delete():
    conf = base_config()
    conf['process']['args'] = ['/init', 'pause']
    add_all_namespaces(conf)

    out, container_id = run_and_get_output(conf, detach=True, hide_stderr=True)
    if out != "":
        return -1
    try:
        state = json.loads(run_crun_command(["state", container_id]))
        if state['status'] != "running":
            return -1
        if state['id'] != container_id:
            return -1
    finally:
        freezerCreated=False
        if not os.path.exists("/sys/fs/cgroup/cgroup.controllers") and os.access('/sys/fs/cgroup/freezer/', os.W_OK):
            # cgroupv1 freezer can easily simulate stuck or breaking `crun delete -f <cid>`
            # this should be only done on cgroupv1 systems
            if not os.path.exists("/sys/fs/cgroup/freezer/frozen/"):
                freezerCreated=True
                os.makedirs("/sys/fs/cgroup/freezer/frozen/")
            with open('/sys/fs/cgroup/freezer/frozen/tasks', 'w') as f:
                f.write(str(state['pid']))
            with open('/sys/fs/cgroup/freezer/frozen/freezer.state', 'w') as f:
                f.write('FROZEN')
        try:
            output = run_crun_command_raw(["delete", "-f", container_id])
        except subprocess.CalledProcessError as exc:
            print("Status : FAIL", exc.returncode, exc.output)
            return -1
        else:
            # this is expected for cgroup v1 so ignore
            if not output or b'Device or resource busy' in output:
                # if output is empty or expected error pass
                pass
            else:
                # anything else is error
                print(output)
                return -1

        if freezerCreated:
            os.rmdir("/sys/fs/cgroup/freezer/frozen/")
    return 0

def test_multiple_containers_delete():
    """Delete multiple containers with a regular expression"""
    conf = base_config()
    conf['process']['args'] = ['/init', 'pause']
    add_all_namespaces(conf)

    out_test1, container_id_test1 = run_and_get_output(conf, detach=True, hide_stderr=True)
    if out_test1 != "":
        return -1
    out_test2, container_id_test2 = run_and_get_output(conf, detach=True, hide_stderr=True)
    if out_test2 != "":
        return -1
    try:
        state_test1 = json.loads(run_crun_command(["state", container_id_test1]))
        if state_test1['status'] != "running":
            return -1
        if state_test1['id'] != container_id_test1:
            return -1
        state_test2 = json.loads(run_crun_command(["state", container_id_test2]))
        if state_test2['status'] != "running":
            return -1
        if state_test2['id'] != container_id_test2:
            return -1
    finally:
        try:
            output = run_crun_command_raw(["delete", "-f", "--regex", "test-*"])
        except subprocess.CalledProcessError as exc:
            print("Status : FAIL", exc.returncode, exc.output)
            return -1
    return 0

def test_help_delete():
    out = run_crun_command(["delete", "--help"])
    if "Usage: crun [OPTION...] delete CONTAINER" not in out:
        return -1
    
    return 0

all_tests = {
    "test_simple_delete" : test_simple_delete,
    "test_multiple_containers_delete" : test_multiple_containers_delete,
    "test_help_delete": test_help_delete,
}

if __name__ == "__main__":
    tests_main(all_tests)