1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
|
#! @PYTHON@
# ------------------------------------------------------------------
#
# Copyright (C) 2013 Canonical Ltd.
# Author: Steve Beattie <steve@nxnw.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of version 2 of the GNU General Public
# License published by the Free Software Foundation.
#
# ------------------------------------------------------------------
import ctypes
import os
import unittest
import LibAppArmor as libapparmor
TESTDIR = "../../../testsuite/test_multi"
# map of testsuite .out entries that aren't a simple to_lower() and
# s/ /_/ of the field name
OUTPUT_MAP = {
'Event type': 'event',
'Mask': 'requested_mask',
'Command': 'comm',
'Token': 'magic_token',
'ErrorCode': 'error_code',
'Network family': 'net_family',
'Socket type': 'net_sock_type',
'Protocol': 'net_protocol',
'Local addr': 'net_local_addr',
'Foreign addr': 'net_foreign_addr',
'Local port': 'net_local_port',
'Foreign port': 'net_foreign_port',
'Audit subid': 'audit_sub_id',
'Class': '_class',
}
# FIXME: pull this automatically out of LibAppArmor, but swig
# classes aren't real enough. (Perhaps we're not using swig correctly)
EVENT_MAP = {
libapparmor.AA_RECORD_ALLOWED: 'AA_RECORD_ALLOWED',
libapparmor.AA_RECORD_AUDIT: 'AA_RECORD_AUDIT',
libapparmor.AA_RECORD_DENIED: 'AA_RECORD_DENIED',
libapparmor.AA_RECORD_ERROR: 'AA_RECORD_ERROR',
libapparmor.AA_RECORD_HINT: 'AA_RECORD_HINT',
libapparmor.AA_RECORD_INVALID: 'AA_RECORD_INVALID',
libapparmor.AA_RECORD_STATUS: 'AA_RECORD_STATUS',
}
# default is None if not in this table
NO_VALUE_MAP = {
'fsuid': int(ctypes.c_ulong(-1).value),
'ouid': int(ctypes.c_ulong(-1).value),
}
class AAPythonBindingsTests(unittest.TestCase):
def setUp(self):
# REPORT ALL THE OUTPUT
self.maxDiff = None
def test_aa_splitcon(self):
AA_SPLITCON_EXPECT = [
("unconfined", "unconfined", None),
("unconfined\n", "unconfined", None),
("/bin/ping (enforce)", "/bin/ping", "enforce"),
("/bin/ping (enforce)\n", "/bin/ping", "enforce"),
("/usr/sbin/rsyslog (complain)", "/usr/sbin/rsyslog", "complain"),
]
for context, expected_label, expected_mode in AA_SPLITCON_EXPECT:
actual_label, actual_mode = libapparmor.aa_splitcon(context)
if expected_label is None:
self.assertIsNone(actual_label)
else:
self.assertIsInstance(actual_label, str)
self.assertEqual(expected_label, actual_label)
if expected_mode is None:
self.assertIsNone(actual_mode)
else:
self.assertIsInstance(actual_mode, str)
self.assertEqual(expected_mode, actual_mode)
with self.assertRaises(ValueError):
libapparmor.aa_splitcon("")
def test_aa_is_enabled(self):
aa_enabled = libapparmor.aa_is_enabled()
self.assertIsInstance(aa_enabled, bool)
@unittest.skipUnless(libapparmor.aa_is_enabled(), "AppArmor is not enabled")
def test_aa_find_mountpoint(self):
mount_point = libapparmor.aa_find_mountpoint()
self.assertIsInstance(mount_point, str)
self.assertGreater(len(mount_point), 0, "mount point should not be empty")
self.assertTrue(os.path.isdir(mount_point))
# TODO: test commented out functions (or at least their prototypes)
# extern int aa_change_profile(const char *profile);
# extern int aa_change_onexec(const char *profile);
@unittest.skipUnless(libapparmor.aa_is_enabled(), "AppArmor is not enabled")
def test_change_hats(self):
# Changing hats will fail because we have no valid hats to change to
# However, we still verify that we get an OSError instead of a TypeError
with self.assertRaises(OSError):
libapparmor.aa_change_hat("nonexistent_profile", 12345678)
with self.assertRaises(OSError):
libapparmor.aa_change_hatv(["nonexistent_1", "nonexistent_2"], 0xabcdef)
libapparmor.aa_change_hatv(("nonexistent_1", "nonexistent_2"), 0xabcdef)
# extern int aa_stack_profile(const char *profile);
# extern int aa_stack_onexec(const char *profile);
# extern int aa_getprocattr(pid_t tid, const char *attr, char **label, char **mode);
# extern int aa_gettaskcon(pid_t target, char **label, char **mode);
@unittest.skipUnless(libapparmor.aa_is_enabled(), "AppArmor is not enabled")
def test_aa_gettaskcon(self):
# Our test harness should be running us as unconfined
# Get our own pid and this should be equivalent to aa_getcon
pid = os.getpid()
label, mode = libapparmor.aa_gettaskcon(pid)
self.assertEqual(label, "unconfined", "aa_gettaskcon label should be unconfined")
self.assertIsNone(mode, "aa_gettaskcon mode should be unconfined")
@unittest.skipUnless(libapparmor.aa_is_enabled(), "AppArmor is not enabled")
def test_aa_getcon(self):
# Our test harness should be running us as unconfined
label, mode = libapparmor.aa_getcon()
self.assertEqual(label, "unconfined", "aa_getcon label should be unconfined")
self.assertIsNone(mode, "aa_getcon mode should be unconfined")
# extern int aa_getpeercon(int fd, char **label, char **mode);
# extern int aa_query_file_path(uint32_t mask, const char *label,
# const char *path, int *allowed, int *audited);
@unittest.skipUnless(libapparmor.aa_is_enabled(), "AppArmor is not enabled")
def test_aa_query_file_path(self):
aa_query_mask = libapparmor.AA_MAY_EXEC | libapparmor.AA_MAY_READ | libapparmor.AA_MAY_WRITE
allowed, audited = libapparmor.aa_query_file_path(aa_query_mask, "unconfined", "/tmp/hello")
self.assertTrue(allowed)
self.assertFalse(audited)
# extern int aa_query_link_path(const char *label, const char *target,
# const char *link, int *allowed, int *audited);
class AALogParsePythonBindingsTests(unittest.TestCase):
def setUp(self):
# REPORT ALL THE OUTPUT
self.maxDiff = None
def _runtest(self, testname):
infile = testname + ".in"
outfile = testname + ".out"
# infile *should* only contain one line
with open(os.path.join(TESTDIR, infile), 'r') as f:
line = f.read()
swig_record = libapparmor.parse_record(line)
record = self.create_record_dict(swig_record)
record['file'] = infile
libapparmor.free_record(swig_record)
expected = self.parse_output_file(outfile)
self.assertEqual(expected, record,
"expected records did not match\n"
"expected = {}\nactual = {}".format(expected, record))
def parse_output_file(self, outfile):
"""parse testcase .out file and return dict"""
output = dict()
with open(os.path.join(TESTDIR, outfile), 'r') as f:
lines = f.readlines()
count = 0
for l in lines:
line = l.rstrip('\n')
count += 1
if line == "START":
self.assertEqual(count, 1,
"Unexpected output format in " + outfile)
continue
else:
key, value = line.split(": ", 1)
if key in OUTPUT_MAP:
newkey = OUTPUT_MAP[key]
else:
newkey = key.lower().replace(' ', '_')
# check if entry already exists?
output[newkey] = value
return output
def create_record_dict(self, record):
"""parse the swig created record and construct a dict from it"""
new_record = dict()
for key in [x for x in dir(record) if not (x.startswith('__') or x == 'this')]:
value = getattr(record, key)
if key == "event" and value in EVENT_MAP:
new_record[key] = EVENT_MAP[value]
elif key == "version":
# FIXME: out files should report log version?
# FIXME: or can we just deprecate v1 logs?
continue
elif key == "thisown":
# SWIG generates this key to track memory allocation
continue
elif key in NO_VALUE_MAP:
if NO_VALUE_MAP[key] == value:
continue
else:
new_record[key] = str(value)
elif value or value == '':
new_record[key] = str(value)
return new_record
def find_testcases(testdir):
"""dig testcases out of passed directory"""
for f in os.listdir(testdir):
if f.endswith(".in"):
yield f[:-3]
def main():
for f in find_testcases(TESTDIR):
def stub_test(self, testname=f):
self._runtest(testname)
stub_test.__doc__ = "test " + f
setattr(AALogParsePythonBindingsTests, 'test_' + f, stub_test)
return unittest.main(verbosity=2)
if __name__ == "__main__":
main()
|