1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
# SPDX-License-Identifier: LGPL-3.0-or-later
__author__ = "Martin Pitt"
__copyright__ = """
(c) 2013 Canonical Ltd.
(c) 2017 - 2022 Martin Pitt <martin@piware.de>
"""
import shutil
import subprocess
import sys
import unittest
import dbus
import dbus.mainloop.glib
import dbusmock
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
have_pkcheck = shutil.which("pkcheck")
@unittest.skipUnless(have_pkcheck, "pkcheck not installed")
class TestPolkit(dbusmock.DBusTestCase):
"""Test mocking polkitd"""
@classmethod
def setUpClass(cls):
cls.start_system_bus()
cls.dbus_con = cls.get_dbus(True)
def setUp(self):
(self.p_mock, self.obj_polkitd) = self.spawn_server_template("polkitd", {}, stdout=subprocess.PIPE)
self.addCleanup(self.p_mock.wait)
self.addCleanup(self.p_mock.terminate)
self.addCleanup(self.p_mock.stdout.close)
self.dbusmock = dbus.Interface(self.obj_polkitd, dbusmock.MOCK_IFACE)
def test_default(self):
self.check_action("org.freedesktop.test.frobnicate", False)
def test_allow_unknown(self):
self.dbusmock.AllowUnknown(True)
self.check_action("org.freedesktop.test.frobnicate", True)
self.dbusmock.AllowUnknown(False)
self.check_action("org.freedesktop.test.frobnicate", False)
def test_set_allowed(self):
self.dbusmock.SetAllowed(["org.freedesktop.test.frobnicate", "org.freedesktop.test.slap"])
self.check_action("org.freedesktop.test.frobnicate", True)
self.check_action("org.freedesktop.test.slap", True)
self.check_action("org.freedesktop.test.wobble", False)
def test_hanging_call(self):
self.dbusmock.SimulateHang(True)
self.assertFalse(self.dbusmock.HaveHangingCalls())
pkcheck = self.check_action_run("org.freedesktop.test.frobnicate")
with self.assertRaises(subprocess.TimeoutExpired):
pkcheck.wait(0.8)
self.assertTrue(self.dbusmock.HaveHangingCalls())
pkcheck.stdout.close()
pkcheck.kill()
pkcheck.wait()
def test_hanging_call_return(self):
self.dbusmock.SetAllowed(["org.freedesktop.test.frobnicate"])
self.dbusmock.SimulateHangActions(["org.freedesktop.test.frobnicate", "org.freedesktop.test.slap"])
self.assertFalse(self.dbusmock.HaveHangingCalls())
frobnicate_pkcheck = self.check_action_run("org.freedesktop.test.frobnicate")
slap_pkcheck = self.check_action_run("org.freedesktop.test.slap")
with self.assertRaises(subprocess.TimeoutExpired):
frobnicate_pkcheck.wait(0.3)
with self.assertRaises(subprocess.TimeoutExpired):
slap_pkcheck.wait(0.3)
self.assertTrue(self.dbusmock.HaveHangingCalls())
self.dbusmock.ReleaseHangingCalls()
self.check_action_result(frobnicate_pkcheck, True)
self.check_action_result(slap_pkcheck, False)
def test_delayed_call(self):
self.dbusmock.SetDelay(3)
pkcheck = self.check_action_run("org.freedesktop.test.frobnicate")
with self.assertRaises(subprocess.TimeoutExpired):
pkcheck.wait(0.8)
pkcheck.stdout.close()
pkcheck.kill()
pkcheck.wait()
def test_delayed_call_return(self):
self.dbusmock.SetDelay(1)
self.dbusmock.SetAllowed(["org.freedesktop.test.frobnicate"])
pkcheck = self.check_action_run("org.freedesktop.test.frobnicate")
with self.assertRaises(subprocess.TimeoutExpired):
pkcheck.wait(0.8)
self.check_action_result(pkcheck, True)
@staticmethod
def check_action_run(action):
# pylint: disable=consider-using-with
return subprocess.Popen(
["pkcheck", "--action-id", action, "--process", "123"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)
def check_action_result(self, pkcheck, expect_allow):
out = pkcheck.communicate()[0]
if expect_allow:
self.assertEqual(pkcheck.returncode, 0)
self.assertEqual(out, "test=test\n")
else:
self.assertNotEqual(pkcheck.returncode, 0)
self.assertEqual(out, "test=test\nNot authorized.\n")
def check_action(self, action, expect_allow):
self.check_action_result(self.check_action_run(action), expect_allow)
if __name__ == "__main__":
# avoid writing to stderr
unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout))
|