File: tests.device-cgroup

package info (click to toggle)
snapd 2.71-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 79,536 kB
  • sloc: ansic: 16,114; sh: 16,105; python: 9,941; makefile: 1,890; exp: 190; awk: 40; xml: 22
file content (226 lines) | stat: -rwxr-xr-x 7,004 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#!/usr/bin/env python3

"""
A tool to convert device cgroup entries to a uniform v1-like format, which is:
<type> <major>:<minor> rwm
"""

import argparse
import ctypes
import json
import logging
import os
import os.path
import subprocess
import sys


def parse_arguments():
    parser = argparse.ArgumentParser(
        description="tool to control device cgroup settings"
    )
    parser.add_argument(
        "--verbose", help="verbose logging", action="store_true", default=False
    )
    parser.add_argument(
        "snap_app",
        help="snap and application in the format <snap>.<app>",
        metavar="snap.app",
    )
    sub = parser.add_subparsers()
    dump = sub.add_parser("dump", description="dump cgroup settings")
    dump.set_defaults(func="dump")
    allow = sub.add_parser("allow", description="allow device access")
    allow.add_argument("kind", choices=("c", "b"), help="device type (c)har, (b)lock")
    allow.add_argument("major_minor", metavar="major:minor", help="major:minor")
    allow.set_defaults(func="allow")
    deny = sub.add_parser("deny", description="deny device access")
    deny.add_argument("kind", choices=("c", "b"), help="device type (c)har, (b)lock")
    deny.add_argument("major_minor", metavar="major:minor", help="major:minor")
    deny.set_defaults(func="deny")

    opts = parser.parse_args()
    if not hasattr(opts, "func"):
        parser.print_help()
        raise SystemExit(1)
    return opts


def is_cgroup_v2():
    CGROUP2_SUPER_MAGIC = "0x63677270"
    output = subprocess.check_output(
        ["stat", "-f", "-c", "0x%t", "/sys/fs/cgroup"],
    )
    fstype = output.decode().strip()
    logging.debug("/sys/fs/cgroup FS type %s", fstype)
    return fstype == CGROUP2_SUPER_MAGIC


def dump_v1(snap_app):
    """Dump device entries from a v1 device cgroup. Entries are already formatted
    in the correct way"""
    group_name = "snap.{}".format(snap_app)
    with open(
        os.path.join("/sys/fs/cgroup/devices/", group_name, "devices.list")
    ) as inf:
        sys.stdout.write(inf.read())


def device_change_v1(snap_app, allow, kind, major_minor):
    group_name = "snap.{}".format(snap_app)
    devices_control = "devices.allow" if allow else "devices.deny"
    with open(
        os.path.join("/sys/fs/cgroup/devices/", group_name, devices_control), "w+"
    ) as outf:
        outf.write("{} {} rwm".format(kind, major_minor))


class DeviceCgroupV2Key(ctypes.Structure):
    """A class representing v2 device cgroup key entry, which is 9 bytes long. The
    fields must match sc_cgroup_v2_device_key defined in
    cmd/snap-confine/device-cgroup-support.c
    """

    # UINT32_MAX
    DEVICE_MINOR_ANY = 0xFFFFFFFF

    _pack_ = 1
    _fields_ = [
        # type is already a correct v1-like char 'b', 'c'
        ("_typ", ctypes.c_char),
        ("major", ctypes.c_uint32),
        ("minor", ctypes.c_uint32),
    ]

    @property
    def typ(self):
        return self._typ.decode()


def dump_v2(snap_app):
    """Dump device entries for a v2 device cgroup. Entries are stored in a BPF map
    created by snap-confine. We dump its contents using bpftool, and then
    convert each entry to a format that looks like v1.
    """
    try:
        snap, app = snap_app.split(".", maxsplit=1)
    except ValueError:
        print("error: malformed snap.app name", file=sys.stderr)
        raise SystemExit(1)
    map_pin_name = "snap_{}_{}".format(snap, app)
    output = subprocess.check_output(
        [
            "bpftool",
            "map",
            "dump",
            "pinned",
            "/sys/fs/bpf/snap/{}".format(map_pin_name),
            "-j",
        ],
    )
    logging.debug("got bpftool output %s", output)
    entries = json.loads(output)
    for entry in entries:
        raw_key = entry.get("key")
        if not raw_key:
            raise RuntimeError("unexpected object content in {}".format(output))
        if len(raw_key) != ctypes.sizeof(DeviceCgroupV2Key):
            raise RuntimeError(
                "unexpected size of raw key {} (expected {})".format(
                    len(raw_key), ctypes.sizeof(DeviceCgroupV2Key)
                )
            )
        raw = bytearray([int(v, 16) for v in raw_key])
        key = DeviceCgroupV2Key.from_buffer(raw)
        # when dumping a cgroup v1 format, the minor number can be presented as
        # either the actual value, or a special value '*' which represents any
        # minor number, make sure we present the v2 minor number in the same
        # format as well
        minor = key.minor
        if minor == DeviceCgroupV2Key.DEVICE_MINOR_ANY:
            minor = "*"
        print(
            "{typ} {major}:{minor} rwm".format(
                typ=key.typ,
                major=key.major,
                minor=minor,
            )
        )


def device_change_v2(snap_app, allow, kind, major_minor):
    """Allow or deny a given combination of device type and major:minor in a device
    cgroup v2 map.
    """
    try:
        snap, app = snap_app.split(".", maxsplit=1)
    except ValueError:
        print("error: malformed snap.app name", file=sys.stderr)
        raise SystemExit(1)
    try:
        major, minor = major_minor.split(":", maxsplit=1)
    except ValueError:
        print("error: malformed major:minor", file=sys.stderr)
        raise SystemExit(1)
    map_pin_name = "snap_{}_{}".format(snap, app)
    key = DeviceCgroupV2Key()
    key.major = int(major)
    if minor == "*":
        key.minor = DeviceCgroupV2Key.DEVICE_MINOR_ANY
    else:
        key.minor = int(minor)
    key._typ = kind.encode()[0]
    key_arg = ["0x%02x" % b for b in bytes(key)]

    if allow:
        cmd = [
            "bpftool",
            "map",
            "update",
            "pinned",
            "/sys/fs/bpf/snap/{}".format(map_pin_name),
            "key",
        ]
        cmd.extend(key_arg)
        cmd.extend(["value", "01", "any"])
    else:
        cmd = [
            "bpftool",
            "map",
            "delete",
            "pinned",
            "/sys/fs/bpf/snap/{}".format(map_pin_name),
            "key",
        ]
        cmd.extend(key_arg)
    logging.debug("running command %s", cmd)
    subprocess.check_call(
        cmd,
        stdout=subprocess.PIPE,
    )


def main(opts):
    func = opts.func
    is_v2 = is_cgroup_v2()
    logging.debug("cgroup v2? %s", is_v2)
    if func == "dump":
        if not is_v2:
            dump_v1(opts.snap_app)
        else:
            dump_v2(opts.snap_app)
    elif func == "allow" or func == "deny":
        allow_or_deny = func == "allow"
        if not is_v2:
            device_change_v1(opts.snap_app, allow_or_deny, opts.kind, opts.major_minor)
        else:
            device_change_v2(opts.snap_app, allow_or_deny, opts.kind, opts.major_minor)


if __name__ == "__main__":
    options = parse_arguments()
    lvl = logging.INFO
    if options.verbose:
        lvl = logging.DEBUG
    logging.basicConfig(level=lvl)
    main(options)