1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
|
#!/usr/bin/env python3
"""
A tool to convert device cgroup entries to a uniform v1-like format, which is:
<type> <major>:<minor> rwm
"""
import argparse
import ctypes
import json
import logging
import os
import os.path
import subprocess
import sys
def parse_arguments():
parser = argparse.ArgumentParser(
description="tool to control device cgroup settings"
)
parser.add_argument(
"--verbose", help="verbose logging", action="store_true", default=False
)
parser.add_argument(
"snap_app",
help="snap and application in the format <snap>.<app>",
metavar="snap.app",
)
sub = parser.add_subparsers()
dump = sub.add_parser("dump", description="dump cgroup settings")
dump.set_defaults(func="dump")
allow = sub.add_parser("allow", description="allow device access")
allow.add_argument("kind", choices=("c", "b"), help="device type (c)har, (b)lock")
allow.add_argument("major_minor", metavar="major:minor", help="major:minor")
allow.set_defaults(func="allow")
deny = sub.add_parser("deny", description="deny device access")
deny.add_argument("kind", choices=("c", "b"), help="device type (c)har, (b)lock")
deny.add_argument("major_minor", metavar="major:minor", help="major:minor")
deny.set_defaults(func="deny")
opts = parser.parse_args()
if not hasattr(opts, "func"):
parser.print_help()
raise SystemExit(1)
return opts
def is_cgroup_v2():
CGROUP2_SUPER_MAGIC = "0x63677270"
output = subprocess.check_output(
["stat", "-f", "-c", "0x%t", "/sys/fs/cgroup"],
)
fstype = output.decode().strip()
logging.debug("/sys/fs/cgroup FS type %s", fstype)
return fstype == CGROUP2_SUPER_MAGIC
def dump_v1(snap_app):
"""Dump device entries from a v1 device cgroup. Entries are already formatted
in the correct way"""
group_name = "snap.{}".format(snap_app)
with open(
os.path.join("/sys/fs/cgroup/devices/", group_name, "devices.list")
) as inf:
sys.stdout.write(inf.read())
def device_change_v1(snap_app, allow, kind, major_minor):
group_name = "snap.{}".format(snap_app)
devices_control = "devices.allow" if allow else "devices.deny"
with open(
os.path.join("/sys/fs/cgroup/devices/", group_name, devices_control), "w+"
) as outf:
outf.write("{} {} rwm".format(kind, major_minor))
class DeviceCgroupV2Key(ctypes.Structure):
"""A class representing v2 device cgroup key entry, which is 9 bytes long. The
fields must match sc_cgroup_v2_device_key defined in
cmd/snap-confine/device-cgroup-support.c
"""
# UINT32_MAX
DEVICE_MINOR_ANY = 0xFFFFFFFF
_pack_ = 1
_fields_ = [
# type is already a correct v1-like char 'b', 'c'
("_typ", ctypes.c_char),
("major", ctypes.c_uint32),
("minor", ctypes.c_uint32),
]
@property
def typ(self):
return self._typ.decode()
def dump_v2(snap_app):
"""Dump device entries for a v2 device cgroup. Entries are stored in a BPF map
created by snap-confine. We dump its contents using bpftool, and then
convert each entry to a format that looks like v1.
"""
try:
snap, app = snap_app.split(".", maxsplit=1)
except ValueError:
print("error: malformed snap.app name", file=sys.stderr)
raise SystemExit(1)
map_pin_name = "snap_{}_{}".format(snap, app)
output = subprocess.check_output(
[
"bpftool",
"map",
"dump",
"pinned",
"/sys/fs/bpf/snap/{}".format(map_pin_name),
"-j",
],
)
logging.debug("got bpftool output %s", output)
entries = json.loads(output)
for entry in entries:
raw_key = entry.get("key")
if not raw_key:
raise RuntimeError("unexpected object content in {}".format(output))
if len(raw_key) != ctypes.sizeof(DeviceCgroupV2Key):
raise RuntimeError(
"unexpected size of raw key {} (expected {})".format(
len(raw_key), ctypes.sizeof(DeviceCgroupV2Key)
)
)
raw = bytearray([int(v, 16) for v in raw_key])
key = DeviceCgroupV2Key.from_buffer(raw)
# when dumping a cgroup v1 format, the minor number can be presented as
# either the actual value, or a special value '*' which represents any
# minor number, make sure we present the v2 minor number in the same
# format as well
minor = key.minor
if minor == DeviceCgroupV2Key.DEVICE_MINOR_ANY:
minor = "*"
print(
"{typ} {major}:{minor} rwm".format(
typ=key.typ,
major=key.major,
minor=minor,
)
)
def device_change_v2(snap_app, allow, kind, major_minor):
"""Allow or deny a given combination of device type and major:minor in a device
cgroup v2 map.
"""
try:
snap, app = snap_app.split(".", maxsplit=1)
except ValueError:
print("error: malformed snap.app name", file=sys.stderr)
raise SystemExit(1)
try:
major, minor = major_minor.split(":", maxsplit=1)
except ValueError:
print("error: malformed major:minor", file=sys.stderr)
raise SystemExit(1)
map_pin_name = "snap_{}_{}".format(snap, app)
key = DeviceCgroupV2Key()
key.major = int(major)
if minor == "*":
key.minor = DeviceCgroupV2Key.DEVICE_MINOR_ANY
else:
key.minor = int(minor)
key._typ = kind.encode()[0]
key_arg = ["0x%02x" % b for b in bytes(key)]
if allow:
cmd = [
"bpftool",
"map",
"update",
"pinned",
"/sys/fs/bpf/snap/{}".format(map_pin_name),
"key",
]
cmd.extend(key_arg)
cmd.extend(["value", "01", "any"])
else:
cmd = [
"bpftool",
"map",
"delete",
"pinned",
"/sys/fs/bpf/snap/{}".format(map_pin_name),
"key",
]
cmd.extend(key_arg)
logging.debug("running command %s", cmd)
subprocess.check_call(
cmd,
stdout=subprocess.PIPE,
)
def main(opts):
func = opts.func
is_v2 = is_cgroup_v2()
logging.debug("cgroup v2? %s", is_v2)
if func == "dump":
if not is_v2:
dump_v1(opts.snap_app)
else:
dump_v2(opts.snap_app)
elif func == "allow" or func == "deny":
allow_or_deny = func == "allow"
if not is_v2:
device_change_v1(opts.snap_app, allow_or_deny, opts.kind, opts.major_minor)
else:
device_change_v2(opts.snap_app, allow_or_deny, opts.kind, opts.major_minor)
if __name__ == "__main__":
options = parse_arguments()
lvl = logging.INFO
if options.verbose:
lvl = logging.DEBUG
logging.basicConfig(level=lvl)
main(options)
|