1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
#!/usr/bin/python3 -EsI
import dbus
import dbus.service
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GObject
from gi.repository import GLib
import os
import selinux
from subprocess import Popen, PIPE, STDOUT
class selinux_server(dbus.service.Object):
default_polkit_auth_required = "org.selinux.semanage"
def __init__(self, *p, **k):
super(selinux_server, self).__init__(*p, **k)
def is_authorized(self, sender, action_id):
bus = dbus.SystemBus()
proxy = bus.get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority')
authority = dbus.Interface(proxy, dbus_interface='org.freedesktop.PolicyKit1.Authority')
subject = ('system-bus-name', {'name': sender})
result = authority.CheckAuthorization(subject, action_id, {}, 1, '')
return result[0]
#
# The semanage method runs a transaction on a series of semanage commands,
# these commands can take the output of customized
#
@dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender")
def semanage(self, buf, sender):
if not self.is_authorized(sender, "org.selinux.semanage"):
raise dbus.exceptions.DBusException("Not authorized")
p = Popen(["/usr/sbin/semanage", "import"], stdout=PIPE, stderr=PIPE, stdin=PIPE, universal_newlines=True)
p.stdin.write(buf)
output = p.communicate()
if p.returncode and p.returncode != 0:
raise dbus.exceptions.DBusException(output[1])
#
# The customized method will return all of the custommizations for policy
# on the server. This output can be used with the semanage method on
# another server to make the two systems have duplicate policy.
#
@dbus.service.method("org.selinux", in_signature='', out_signature='s', sender_keyword="sender")
def customized(self, sender):
if not self.is_authorized(sender, "org.selinux.customized"):
raise dbus.exceptions.DBusException("Not authorized")
p = Popen(["/usr/sbin/semanage", "export"], stdout=PIPE, stderr=PIPE, universal_newlines=True)
buf = p.stdout.read()
output = p.communicate()
if p.returncode and p.returncode != 0:
raise OSError("Failed to read SELinux configuration: %s", output)
return buf
#
# The semodule_list method will return the output of semodule --list=full, using the customized polkit,
# since this is a readonly behaviour
#
@dbus.service.method("org.selinux", in_signature='', out_signature='s', sender_keyword="sender")
def semodule_list(self, sender):
if not self.is_authorized(sender, "org.selinux.semodule_list"):
raise dbus.exceptions.DBusException("Not authorized")
p = Popen(["/usr/sbin/semodule", "--list=full"], stdout=PIPE, stderr=PIPE, universal_newlines=True)
buf = p.stdout.read()
output = p.communicate()
if p.returncode and p.returncode != 0:
raise OSError("Failed to list SELinux modules: %s", output)
return buf
#
# The restorecon method modifies any file path to the default system label
#
@dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender")
def restorecon(self, path, sender):
if not self.is_authorized(sender, "org.selinux.restorecon"):
raise dbus.exceptions.DBusException("Not authorized")
selinux.restorecon(str(path), recursive=1)
#
# The setenforce method turns off the current enforcement of SELinux
#
@dbus.service.method("org.selinux", in_signature='i', sender_keyword="sender")
def setenforce(self, value, sender):
if not self.is_authorized(sender, "org.selinux.setenforce"):
raise dbus.exceptions.DBusException("Not authorized")
selinux.security_setenforce(value)
#
# The setenforce method turns off the current enforcement of SELinux
#
@dbus.service.method("org.selinux", in_signature='i', sender_keyword="sender")
def relabel_on_boot(self, value, sender):
if not self.is_authorized(sender, "org.selinux.relabel_on_boot"):
raise dbus.exceptions.DBusException("Not authorized")
if value == 1:
fd = open("/.autorelabel", "w")
fd.close()
else:
try:
os.unlink("/.autorelabel")
except FileNotFoundError:
pass
def write_selinux_config(self, enforcing=None, policy=None):
path = selinux.selinux_path() + "config"
backup_path = path + ".bck"
fd = open(path)
lines = fd.readlines()
fd.close()
fd = open(backup_path, "w")
for l in lines:
if enforcing and l.startswith("SELINUX="):
fd.write("SELINUX=%s\n" % enforcing)
continue
if policy and l.startswith("SELINUXTYPE="):
fd.write("SELINUXTYPE=%s\n" % policy)
continue
fd.write(l)
fd.close()
os.rename(backup_path, path)
#
# The change_default_enforcement modifies the current enforcement mode
#
@dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender")
def change_default_mode(self, value, sender):
if not self.is_authorized(sender, "org.selinux.change_default_mode"):
raise dbus.exceptions.DBusException("Not authorized")
values = ["enforcing", "permissive", "disabled"]
if value not in values:
raise ValueError("Enforcement mode must be %s" % ", ".join(values))
self.write_selinux_config(enforcing=value)
#
# The change_default_policy method modifies the policy type
#
@dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender")
def change_default_policy(self, value, sender):
if not self.is_authorized(sender, "org.selinux.change_default_policy"):
raise dbus.exceptions.DBusException("Not authorized")
path = selinux.selinux_path() + value
if os.path.isdir(path):
return self.write_selinux_config(policy=value)
raise ValueError("%s does not exist" % path)
if __name__ == "__main__":
DBusGMainLoop(set_as_default=True)
mainloop = GLib.MainLoop()
system_bus = dbus.SystemBus()
name = dbus.service.BusName("org.selinux", system_bus)
server = selinux_server(system_bus, "/org/selinux/object")
mainloop.run()
|