File: test-unix.py

package info (click to toggle)
apparmor 4.1.6-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 29,884 kB
  • sloc: ansic: 24,945; python: 24,914; cpp: 9,140; sh: 8,175; yacc: 2,061; makefile: 1,908; lex: 1,215; pascal: 1,147; perl: 1,033; ruby: 365; lisp: 282; exp: 250; java: 212; xml: 159
file content (202 lines) | stat: -rw-r--r-- 14,407 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#!/usr/bin/python3
# ----------------------------------------------------------------------
#    Copyright (C) 2024 Canonical, Ltd.
#
#    This program is free software; you can redistribute it and/or
#    modify it under the terms of version 2 of the GNU General Public
#    License as published by the Free Software Foundation.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
# ----------------------------------------------------------------------

import unittest
from common_test import AATest, setup_all_loops

from apparmor.common import AppArmorException
from apparmor.translations import init_translation

from apparmor.rule.unix import UnixRule

_ = init_translation()


class UnixTestParse(AATest):

    tests = (
        #                   Rule                                     Accesses           Rule conds                        Local expr                    Peer expr                       Audit  Deny   Allow  Comment
        ('unix,',                                           UnixRule(UnixRule.ALL,      UnixRule.ALL,                     UnixRule.ALL,                 UnixRule.ALL,                   False, False, False, '')),
        ('unix rw,',                                        UnixRule('rw',              UnixRule.ALL,                     UnixRule.ALL,                 UnixRule.ALL,                   False, False, False, '')),
        ('unix (accept, rw),',                              UnixRule(('accept', 'rw'),  UnixRule.ALL,                     UnixRule.ALL,                 UnixRule.ALL,                   False, False, False, '')),
        ('unix peer=(addr=AA label=bb),',                   UnixRule(UnixRule.ALL,      UnixRule.ALL,                     UnixRule.ALL,                 {'addr': 'AA', 'label': 'bb'},  False, False, False, '')),
        ('unix opt=AA label=bb,',                           UnixRule(UnixRule.ALL,      UnixRule.ALL,                     {'opt': 'AA', 'label': 'bb'}, UnixRule.ALL,                   False, False, False, '')),
        ('unix (accept rw) type=AA protocol=BB,',           UnixRule(('accept', 'rw'),  {'type': 'AA', 'protocol': 'BB'}, UnixRule.ALL,                 UnixRule.ALL,                   False, False, False, '')),
        ('unix (accept, rw) protocol=AA type=BB,',          UnixRule(('accept', 'rw'),  {'type': 'BB', 'protocol': 'AA'}, UnixRule.ALL,                 UnixRule.ALL,                   False, False, False, '')),
        ('unix shutdown addr=@srv,',                        UnixRule('shutdown',        UnixRule.ALL,                     {'addr': '@srv'},             UnixRule.ALL,                   False, False, False, '')),
        ('unix send addr=@foo{a,b} peer=(label=splat),',    UnixRule('send',            UnixRule.ALL,                     {'addr': '@foo{a,b}'},        {'label': 'splat'},             False, False, False, '')),
        ('unix peer=(addr=@/tmp/foo-??????),',              UnixRule(UnixRule.ALL,      UnixRule.ALL,                     UnixRule.ALL,                 {'addr': '@/tmp/foo-??????'},   False, False, False, '')),
        ('unix peer=(addr="@/tmp/f o-??????"),',            UnixRule(UnixRule.ALL,      UnixRule.ALL,                     UnixRule.ALL,                 {'addr': '@/tmp/f o-??????'},   False, False, False, '')),
        ('unix peer=(addr=@/tmp/foo-*),',                   UnixRule(UnixRule.ALL,      UnixRule.ALL,                     UnixRule.ALL,                 {'addr': '@/tmp/foo-*'},        False, False, False, '')),
        ('unix (accept, rw) protocol=AA type=BB opt=AA label=bb peer=(addr=a label=bb),',
                                                            UnixRule(('accept', 'rw'),  {'type': 'BB', 'protocol': 'AA'}, {'opt': 'AA', 'label': 'bb'}, {'addr': 'a', 'label': 'bb'},   False, False, False, '')),  # noqa: E127
        ('unix peer=( label=la, addr="@/h"),',              UnixRule(UnixRule.ALL,      UnixRule.ALL,                     UnixRule.ALL,                {'addr': '@/h', 'label': 'la,'}, False, False, False, '')),
        ('unix peer=(addr="@/h o", label="l a"),',          UnixRule(UnixRule.ALL,      UnixRule.ALL,                     UnixRule.ALL,              {'addr': '@/h o', 'label': 'l a'}, False, False, False, '')),
        ('unix addr="@/h" label=la,',                       UnixRule(UnixRule.ALL,      UnixRule.ALL,                     {'addr': '@/h', 'label': 'la'},    UnixRule.ALL,              False, False, False, '')),
        ('unix addr="@/h o" label="l a",',                  UnixRule(UnixRule.ALL,      UnixRule.ALL,                     {'addr': '@/h o', 'label': 'l a'}, UnixRule.ALL,              False, False, False, '')),
    )

    def _run_test(self, rawrule, expected):
        self.assertTrue(UnixRule.match(rawrule))
        obj = UnixRule.create_instance(rawrule)
        expected.raw_rule = rawrule.strip()
        self.assertTrue(obj.is_equal(expected, True), f'\n  {rawrule}   expected,\n  {obj.get_clean()}   returned by obj.get_clean()\n  {expected.get_clean()}   returned by expected.get_clean()')

    def test_diff_local(self):
        obj1 = UnixRule('send', UnixRule.ALL, {'addr': 'foo'}, UnixRule.ALL, )
        obj2 = UnixRule('send', UnixRule.ALL, UnixRule.ALL, {'addr': 'bar'})
        self.assertFalse(obj1.is_equal(obj2, False))

    def test_diff_peer(self):
        obj1 = UnixRule('send', UnixRule.ALL, UnixRule.ALL, {'addr': 'foo'})
        obj2 = UnixRule('send', UnixRule.ALL, UnixRule.ALL, {'addr': 'bar'})
        self.assertFalse(obj1.is_equal(obj2, False))


class UnixTestParseInvalid(AATest):
    tests = (
        ('unix invalid,',   AppArmorException),
        ('unix (invalid),', AppArmorException),
    )

    def _run_test(self, rawrule, expected):
        self.assertTrue(UnixRule.match(rawrule))  # the above invalid rules still match the main regex!
        with self.assertRaises(expected):
            UnixRule.create_instance(rawrule)

    def test_parse_fail(self):
        with self.assertRaises(AppArmorException):
            UnixRule.create_instance('foo,')

    def test_invalid_key(self):
        with self.assertRaises(AppArmorException):
            UnixRule('send', UnixRule.ALL, {'invalid': 'whatever'}, UnixRule.ALL,  False, False, False, '')

    def test_invalid_access(self):
        with self.assertRaises(AppArmorException):
            UnixRule('invalid', UnixRule.ALL, UnixRule.ALL, UnixRule.ALL,  False, False, False, '')

    def test_invalid_access2(self):
        with self.assertRaises(AppArmorException):
            UnixRule(('rw', 'invalid'), UnixRule.ALL, UnixRule.ALL, UnixRule.ALL,  False, False, False, '')

    def test_invalid_peer_expr(self):
        with self.assertRaises(AppArmorException):
            UnixRule('create', UnixRule.ALL, UnixRule.ALL, {'addr': 'foo'}, False, False, False, '')


class UnixIsCoveredTest(AATest):
    def test_is_covered(self):
        obj = UnixRule(('accept', 'rw'), {'type': 'F*', 'protocol': 'AA'}, {'addr': 'AA'}, {'addr': 'AA', 'label': 'bb'})
        tests = [
            (('accept',),       {'type': 'F*', 'protocol': 'AA'},   {'opt': 'AA', 'label': 'bb'},   {'addr': 'AA', 'label': 'bb'}),
            (('accept', 'rw'),  {'type': 'F*'},                     {'opt': 'AA', 'label': 'bb'},   {'addr': 'AA', 'label': 'bb'}),
            (('accept', 'rw'),  {'type': 'Foo'},                    {'addr': 'AA'},                 {'addr': 'AA', 'label': 'bb'}),
            (('accept', 'rw'),  {'type': 'Foo'},                    {'addr': 'AA', 'opt': 'BB'},    {'addr': 'AA', 'label': 'bb'})
        ]
        for test in tests:
            self.assertTrue(obj.is_covered(UnixRule(*test)))
            self.assertFalse(obj.is_equal(UnixRule(*test)))

    def test_is_covered2(self):
        obj = UnixRule(('accept', 'rw'), UnixRule.ALL, {'addr': 'AA'}, {'addr': 'AA', 'label': 'bb'})
        tests = [
            (('accept',),       {'type': 'F*', 'protocol': 'AA'},   {'opt': 'AA', 'label': 'bb'},   {'addr': 'AA', 'label': 'bb'}),
            (('accept', 'rw'),  {'type': 'F*'},                     {'opt': 'AA', 'label': 'bb'},   {'addr': 'AA', 'label': 'bb'}),
            (('accept', 'rw'),  {'type': 'Foo'},                    {'addr': 'AA'},                 {'addr': 'AA', 'label': 'bb'}),
            (('accept', 'rw'),  {'type': 'Foo'},                    {'addr': 'AA', 'opt': 'BB'},    {'addr': 'AA', 'label': 'bb'})
        ]
        for test in tests:
            self.assertTrue(obj.is_covered(UnixRule(*test)))
            self.assertFalse(obj.is_equal(UnixRule(*test)))

    def test_is_not_covered(self):
        obj = UnixRule(('accept', 'rw'), {'type': 'F'}, {'opt': 'AA'}, {'addr': 'AA', 'label': 'bb'})
        tests = [
            (('r',),            {'type': 'F*', 'protocol': 'AA'},   {'opt': 'AA', 'label': 'bb'},   {'addr': 'AA', 'label': 'bb'}),
            (('accept', 'rw'),  {'type': 'B'},                      {'opt': 'AA', 'label': 'bb'},   {'addr': 'AA', 'label': 'bb'}),
            (('accept', 'rw'),  {'type': 'F'},                      {'opt': 'AA', 'label': 'bb'},   UnixRule.ALL),
            (('accept', 'rw'),  {'type': 'F'},                      {'opt': 'notcovered'},          {'addr': 'AA', 'label': 'bb'}),
            (('accept', 'rw'),  {'type': 'F'},                      {'opt': 'AA'},                  {'addr': 'notcovered'}),
        ]
        for test in tests:
            self.assertFalse(obj.is_covered(UnixRule(*test)), test)
            self.assertFalse(obj.is_equal(UnixRule(*test)))

    def test_is_covered_priority(self):
        obj = UnixRule(('accept', 'rw'), {'type': 'F*', 'protocol': 'AA'}, {'addr': 'AA'}, {'addr': 'AA', 'label': 'bb'}, priority=0)
        prio_obj = UnixRule(('accept', 'rw'), {'type': 'F*', 'protocol': 'AA'}, {'addr': 'AA'}, {'addr': 'AA', 'label': 'bb'}, priority=1)

        self.assertFalse(obj.is_covered(prio_obj))
        self.assertTrue(prio_obj.is_covered(obj))


class UnixLogprofHeaderTest(AATest):
    tests = (
        ('unix,',                                           [_('Accesses'), 'ALL',  _('Rule'), 'ALL', _('Local'), 'ALL',                        _('Peer'), 'ALL']),
        ('unix rw,',                                        [_('Accesses'), 'rw',   _('Rule'), 'ALL', _('Local'), 'ALL',                        _('Peer'), 'ALL']),
        ('unix send addr=@foo{one,two peer=(label=splat),', [_('Accesses'), 'send', _('Rule'), 'ALL', _('Local'), {'addr': '@foo{one,two'},     _('Peer'), {'label': 'splat'}])
    )

    def _run_test(self, params, expected):
        obj = UnixRule.create_instance(params)
        self.assertEqual(obj.logprof_header(), expected)


class UnixTestGlob(AATest):
    def test_glob(self):
        glob_list = [(
            'unix (accept, rw) type=BB protocol=AA label=bb opt=AA peer=(addr=a label=bb),',
            'unix (accept, rw) type=BB protocol=AA label=bb opt=AA,',
            'unix (accept, rw) type=BB protocol=AA,',
            'unix (accept, rw),',
            'unix,',
        )]
        for globs in glob_list:
            for i in range(len(globs) - 1):
                rule = UnixRule.create_instance(globs[i])
                rule.glob()
                self.assertEqual(rule.get_clean(), globs[i + 1])


class UnixTestClean(AATest):
    tests = (
        ('     audit  unix                                                                                                                   ,    # foo  ', 'audit unix, # foo'),
        ('     audit deny unix                                                   label  =  foo                                               ,           ', 'audit deny unix label=foo,'),
        ('     audit allow unix                                                  peer  =  (addr  =  a)                                       ,    # foo  ', 'audit allow unix peer=(addr=a), # foo'),
        ('     deny unix                                                   type  =  foo                                                      ,           ', 'deny unix type=foo,'),
        ('     allow unix                                                              peer  =  (label=bb)                                   ,    # foo  ', 'allow unix peer=(label=bb), # foo'),
        ('     unix                                                                                                                          ,    # foo  ', 'unix, # foo'),
        ('     unix                                                   addr  =  foo                                                           ,           ', 'unix addr=foo,'),
        ('     unix    (  accept  , rw)  protocol  =  AA  type =  BB  opt  =  myopt  label  =  bb peer  =  (addr  =  a label  =  bb )        ,           ', 'unix (accept, rw) type=BB protocol=AA label=bb opt=myopt peer=(addr=a label=bb),'),
        ('priority=-42 unix    (  accept  , rw)  protocol  =  AA  type =  BB  opt  =  myopt  label  =  bb peer  =  (addr  =  a label  =  bb ),           ', 'priority=-42 unix (accept, rw) type=BB protocol=AA label=bb opt=myopt peer=(addr=a label=bb),'),
        ('priority = 0 unix    (  accept  , rw)  protocol  =  AA  type =  BB  opt  =  myopt  label  =  bb peer  =  (addr  =  a label  =  bb ),           ', 'priority=0 unix (accept, rw) type=BB protocol=AA label=bb opt=myopt peer=(addr=a label=bb),'),
        ('priority=211 unix    (  accept  , rw)  protocol  =  AA  type =  BB  opt  =  myopt  label  =  bb peer  =  (addr  =  a label  =  bb ),           ', 'priority=211 unix (accept, rw) type=BB protocol=AA label=bb opt=myopt peer=(addr=a label=bb),'),
        ('priority=+45 unix    (  accept  , rw)  protocol  =  AA  type =  BB  opt  =  myopt  label  =  bb peer  =  (addr  =  a label  =  bb ),           ', 'priority=45 unix (accept, rw) type=BB protocol=AA label=bb opt=myopt peer=(addr=a label=bb),'),
    )

    def _run_test(self, rawrule, expected):
        self.assertTrue(UnixRule.match(rawrule))
        obj = UnixRule.create_instance(rawrule)
        clean = obj.get_clean()
        raw = obj.get_raw()

        self.assertEqual(expected, clean, 'unexpected clean rule')
        self.assertEqual(rawrule.strip(), raw, 'unexpected raw rule')


setup_all_loops(__name__)
if __name__ == '__main__':
    unittest.main(verbosity=1)