1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
import unittest
import os
import pwd
import grp
import subprocess
from tempfile import mkstemp
from time import sleep
from os.path import split
NOBODY_UID = pwd.getpwnam("nobody").pw_uid
if os.path.exists("/etc/debian_version"):
NOBODY_GID = grp.getgrnam("nogroup").gr_gid
else:
NOBODY_GID = grp.getgrnam("nobody").gr_gid
class DaemonizeTest(unittest.TestCase):
def setUp(self):
self.pidfile = mkstemp()[1]
os.system("python3 tests/daemon_sigterm.py %s" % self.pidfile)
sleep(.1)
def tearDown(self):
os.system("kill `cat %s`" % self.pidfile)
sleep(.1)
def test_pidfile_presense(self):
sleep(10)
self.assertTrue(os.path.isfile(self.pidfile))
class LockingTest(unittest.TestCase):
def setUp(self):
self.pidfile = mkstemp()[1]
print("First daemonize process started")
os.system("python3 tests/daemon_sigterm.py %s" % self.pidfile)
sleep(.1)
def tearDown(self):
os.system("kill `cat %s`" % self.pidfile)
sleep(.1)
def test_locking(self):
sleep(10)
print("Attempting to start second daemonize process")
proc = subprocess.call(["python3", "tests/daemon_sigterm.py", self.pidfile])
self.assertEqual(proc, 1)
class KeepFDsTest(unittest.TestCase):
def setUp(self):
self.pidfile = mkstemp()[1]
self.logfile = mkstemp()[1]
os.system("python3 tests/daemon_keep_fds.py %s %s" % (self.pidfile, self.logfile))
sleep(1)
def tearDown(self):
os.system("kill `cat %s`" % self.pidfile)
os.remove(self.logfile)
sleep(.1)
def test_keep_fds(self):
log = open(self.logfile, "r").read()
self.assertEqual(log, "Test\n")
class UidGidTest(unittest.TestCase):
def setUp(self):
self.expected = " ".join(map(str, [NOBODY_UID] * 2 + [NOBODY_GID] * 2))
self.pidfile = mkstemp()[1]
self.logfile = mkstemp()[1]
def tearDown(self):
os.remove(self.logfile)
def test_uid_gid(self):
# Skip test if user is not root
if os.getuid() != 0:
return True
os.chown(self.logfile, NOBODY_UID, NOBODY_GID)
os.system("python3 tests/daemon_uid_gid.py %s %s" % (self.pidfile, self.logfile))
sleep(1)
with open(self.logfile, "r") as f:
self.assertEqual(f.read(), self.expected)
self.assertFalse(os.access(self.pidfile, os.F_OK))
def test_uid_gid_action(self):
# Skip test if user is not root
if os.getuid() != 0:
return True
os.chown(self.pidfile, NOBODY_UID, NOBODY_GID)
os.system("python3 tests/daemon_uid_gid_action.py %s %s" % (self.pidfile, self.logfile))
sleep(1)
with open(self.logfile, "r") as f:
self.assertEqual(f.read(), self.expected)
class PrivilegedActionTest(unittest.TestCase):
def setUp(self):
self.correct_log = """Privileged action.
Starting daemon.
Action.
Stopping daemon.
"""
self.pidfile = mkstemp()[1]
self.logfile = mkstemp()[1]
os.system("python3 tests/daemon_privileged_action.py %s %s" % (self.pidfile, self.logfile))
sleep(.1)
def tearDown(self):
os.system("kill `cat %s`" % self.pidfile)
sleep(.1)
def test_privileged_action(self):
sleep(5)
with open(self.logfile, "r") as contents:
self.assertEqual(contents.read(), self.correct_log)
class ChdirTest(unittest.TestCase):
def setUp(self):
self.pidfile = mkstemp()[1]
self.target = mkstemp()[1]
base, file = split(self.target)
os.system("python3 tests/daemon_chdir.py %s %s %s" % (self.pidfile, base, file))
sleep(1)
def tearDown(self):
os.system("kill `cat %s`" % self.pidfile)
sleep(.1)
def test_keep_fds(self):
log = open(self.target, "r").read()
self.assertEqual(log, "test")
if __name__ == '__main__':
unittest.main()
|