File: test_privileged.py

package info (click to toggle)
freedombox 26.3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 83,092 kB
  • sloc: python: 48,542; javascript: 1,730; xml: 481; makefile: 290; sh: 137; php: 32
file content (413 lines) | stat: -rw-r--r-- 13,557 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Test module to exercise user actions.

it is recommended to run this module with root privileges in a virtual machine.
"""

import random
import re
import string
import subprocess

import pytest

from plinth import action_utils
from plinth.modules.users import privileged
from plinth.tests import config as test_config

_cleanup_users = None
_cleanup_groups = None

# Temporary admin user created if an admin doesn't already exist
PYTEST_ADMIN_USERNAME = 'pytest_admin'


def _is_ldap_set_up():
    """Return whether LDAP is set up."""
    try:
        return subprocess.call([
            'ldapsearch', '-Y', 'EXTERNAL', '-H', 'ldapi:///', '-b',
            'ou=groups,dc=thisbox'
        ]) == 0
    except FileNotFoundError:
        return False


pytestmark: list[pytest.MarkDecorator] = [
    pytest.mark.usefixtures('needs_root', 'load_cfg', 'mock_privileged'),
    pytest.mark.skipif(not _is_ldap_set_up(), reason='LDAP is not configured')
]
privileged_modules_to_mock = [
    'plinth.modules.users.privileged', 'plinth.modules.security.privileged'
]


def _random_string():
    """Return a random string created from lower case ascii."""
    random_chars = [random.choice(string.ascii_lowercase) for _ in range(8)]
    return 'test_' + ''.join(random_chars)


def _get_password_hash(username):
    """Query and return the password hash of the given LDAP username"""
    query = [
        'ldapsearch', '-L', '-L', '-L', '-Y', 'EXTERNAL', '-H', 'ldapi:///',
        '-b', 'ou=users,dc=thisbox', '-Q', '(cn={})'.format(username),
        'userPassword'
    ]
    process = subprocess.run(query, stdout=subprocess.PIPE,
                             stderr=subprocess.DEVNULL, check=True)
    return process.stdout.decode().strip().split()[-1]


def _get_samba_users():
    """Get users from the Samba user database."""
    stdout = subprocess.check_output(
        ['tdbdump', '/var/lib/samba/private/passdb.tdb']).decode()
    return re.findall(r'USER_(.*)\\0', stdout)


def _try_login_to_ssh(username, password, returncode=0):
    """Return whether the sshpass returncode matches when trying to
    login to ssh using the given username and password"""
    if not action_utils.service_is_running('ssh'):
        return True

    command = [
        'sshpass', '-p', password, 'ssh', '-o', 'UserKnownHostsFile=/dev/null',
        '-o', 'StrictHostKeyChecking=no', '-o', 'VerifyHostKeyDNS=no',
        username + '@127.0.0.1', '/bin/true'
    ]
    process = subprocess.run(command, stdout=subprocess.DEVNULL,
                             stderr=subprocess.DEVNULL, check=False)
    return process.returncode == returncode


@pytest.fixture(name='auto_cleanup_users_groups', autouse=True)
def fixture_auto_cleanup_users_groups(needs_root, load_cfg):
    """Remove all the users and groups created during tests."""
    global _cleanup_users, _cleanup_groups

    _cleanup_users = set()
    _cleanup_groups = set()
    yield

    pytest_admin_exists = PYTEST_ADMIN_USERNAME in _cleanup_users

    for user in _cleanup_users:
        if user == PYTEST_ADMIN_USERNAME:
            continue
        try:
            _delete_user(user)
        except Exception:
            pass

    if pytest_admin_exists:
        try:
            _delete_user(PYTEST_ADMIN_USERNAME)
        except Exception:
            pass

    for group in _cleanup_groups:
        privileged.remove_group(group)


def _create_user(username=None, groups=None):
    """Call the action script for creating a new user."""
    username = username or _random_string()
    password = username + '_passwd'
    admin_user, admin_password = _get_admin_user_password()

    privileged.create_user(username, password, admin_user, admin_password)

    if groups:
        for group in groups:
            admin_user, admin_password = _get_admin_user_password()
            privileged.add_user_to_group(username, group, admin_user,
                                         admin_password)
            if group != 'admin':
                _cleanup_groups.add(group)

    _cleanup_users.add(username)
    return username, password


def _delete_user(username):
    """Utility to delete an LDAP and Samba user"""
    admin_user, admin_password = _get_admin_user_password()
    privileged.remove_user(username, admin_user, admin_password)


def _create_admin_if_does_not_exist():
    """Create a main admin user"""
    admin_user, _ = _get_admin_user_password()
    if not admin_user:
        _create_user(PYTEST_ADMIN_USERNAME, ['admin'])


def _get_admin_user_password():
    """Return an admin username and password."""
    admin_users = privileged.get_group_users('admin')

    if not admin_users:
        return ('', '')

    if test_config.admin_username in admin_users:
        return (test_config.admin_username, test_config.admin_password)

    if PYTEST_ADMIN_USERNAME in admin_users:
        return (PYTEST_ADMIN_USERNAME, PYTEST_ADMIN_USERNAME + '_passwd')

    return (admin_users[0], admin_users[0] + '_passwd')


def _rename_user(old_username, new_username=None):
    """Rename a user."""
    new_username = new_username or _random_string()

    privileged.rename_user(old_username, new_username)
    _cleanup_users.remove(old_username)
    _cleanup_users.add(new_username)
    return new_username


def _create_group(groupname=None):
    groupname = groupname or _random_string()
    privileged.create_group(groupname)
    if groupname != 'admin':
        _cleanup_groups.add(groupname)
    return groupname


def test_create_user():
    """Test whether creating a new user works."""
    _create_admin_if_does_not_exist()

    username, password = _create_user(groups=[_random_string()])

    assert _try_login_to_ssh(username, password)
    assert username in _get_samba_users()
    with pytest.raises(subprocess.CalledProcessError):
        _create_user(username)


def test_create_invalid_user():
    """Test invalid username validation."""
    username = 'invalid/user'
    with pytest.raises(ValueError):
        _create_user(username)


def test_change_user_password():
    """Test changing user password."""
    _create_admin_if_does_not_exist()
    admin_user, admin_password = _get_admin_user_password()

    username, old_password = _create_user()
    old_password_hash = _get_password_hash(username)
    new_password = 'pass $123'

    privileged.set_user_password(username, new_password, admin_user,
                                 admin_password)

    new_password_hash = _get_password_hash(username)
    assert old_password_hash != new_password_hash

    # User can login to ssh using new password but not the old password.
    # sshpass gives a return code of 5 if the password is incorrect.
    assert _try_login_to_ssh(username, old_password, returncode=5)
    assert _try_login_to_ssh(username, new_password)


def test_change_password_as_non_admin_user():
    """Test changing user password as a non-admin user."""
    _create_admin_if_does_not_exist()

    username, old_password = _create_user()
    old_password_hash = _get_password_hash(username)
    new_password = 'pass $123'

    privileged.set_user_password(username, new_password, username,
                                 old_password)

    new_password_hash = _get_password_hash(username)
    assert old_password_hash != new_password_hash

    # User can login to ssh using new password but not the old password.
    # sshpass gives a return code of 5 if the password is incorrect.
    assert _try_login_to_ssh(username, old_password, returncode=5)
    assert _try_login_to_ssh(username, new_password)


def test_change_other_users_password_as_non_admin():
    """Test that changing other user's password as a non-admin user fails."""
    _create_admin_if_does_not_exist()

    username1, password1 = _create_user()
    username2, _ = _create_user()
    new_password = 'pass $123'

    with pytest.raises(PermissionError):
        privileged.set_user_password(username2, new_password, username1,
                                     password1)


def test_set_password_for_non_existent_user():
    """Test setting password for a non-existent user."""
    _create_admin_if_does_not_exist()
    admin_user, admin_password = _get_admin_user_password()

    non_existent_user = _random_string()
    fake_password = _random_string()

    with pytest.raises(subprocess.CalledProcessError):
        privileged.set_user_password(non_existent_user, fake_password,
                                     admin_user, admin_password)


def test_rename_user():
    """Test whether renaming a user works."""
    _create_admin_if_does_not_exist()

    old_username, password = _create_user(groups=['admin', _random_string()])
    old_groups = privileged.get_user_groups(old_username)

    new_username = _rename_user(old_username)
    assert _try_login_to_ssh(new_username, password)
    assert _try_login_to_ssh(old_username, password, returncode=5)
    assert old_username not in _get_samba_users()

    new_groups = privileged.get_user_groups(new_username)
    old_users_groups = privileged.get_user_groups(old_username)
    assert not old_users_groups  # empty
    assert old_groups == new_groups

    with pytest.raises(subprocess.CalledProcessError):
        _rename_user(old_username)

    # Renaming a non-existent user fails
    random_username = _random_string()
    with pytest.raises(subprocess.CalledProcessError):
        _rename_user(random_username, new_username=_random_string())

    # Renaming to an existing user fails
    existing_user, _ = _create_user()
    with pytest.raises(subprocess.CalledProcessError):
        _rename_user(existing_user, new_username=new_username)


def test_rename_invalid_user():
    """Test rename invalid username"""
    invalid_username = 'invalid/user'
    valid_username = _random_string()

    with pytest.raises(ValueError):
        _rename_user(invalid_username, new_username=valid_username)

    with pytest.raises(ValueError):
        _rename_user(valid_username, new_username=invalid_username)


def test_delete_user():
    """Test to check whether LDAP users can be deleted"""
    _create_admin_if_does_not_exist()

    username, password = _create_user(groups=[_random_string()])
    _delete_user(username)
    groups_after = privileged.get_user_groups(username)
    assert not groups_after  # User gets removed from all groups

    # User account cannot be found after deletion
    with pytest.raises(subprocess.CalledProcessError):
        subprocess.run(['ldapid', username], check=True)

    # Deleted user cannot login to ssh
    assert _try_login_to_ssh(username, password, returncode=5)

    assert username not in _get_samba_users()


def test_delete_non_existent_user():
    """Deleting a non-existent user doesn't fail."""
    non_existent_user = _random_string()
    _delete_user(non_existent_user)


def test_delete_invalid_user():
    """Deleting invalid username should fail."""
    invalid_username = 'invalid/user'

    with pytest.raises(ValueError):
        _delete_user(invalid_username)


def test_groups():
    """Test to check that LDAP groups can be deleted"""
    groupname = _random_string()

    _create_group(groupname)
    subprocess.run(['ldapgid', groupname], check=True)

    # create-group is idempotent
    privileged.create_group(groupname)

    privileged.remove_group(groupname)
    with pytest.raises(subprocess.CalledProcessError):
        subprocess.run(['ldapgid', groupname], check=True)

    # delete-group is idempotent
    privileged.remove_group(groupname)


def test_delete_admin_group_fails():
    """Test that deleting the admin group fails."""
    groupname = 'admin'
    _create_group('admin')

    with pytest.raises(ValueError):
        privileged.remove_group(groupname)


def test_user_group_interactions():
    """Test adding/removing user from a groups."""
    _create_admin_if_does_not_exist()
    admin_user, admin_password = _get_admin_user_password()

    group1 = _random_string()
    user1, _ = _create_user(groups=[group1])
    assert [group1] == privileged.get_user_groups(user1)

    # add-user-to-group is not idempotent
    with pytest.raises(subprocess.CalledProcessError):
        privileged.add_user_to_group(user1, group1, admin_user, admin_password)

    # The same user can be added to other new groups
    group2 = _random_string()
    _create_group(group2)
    privileged.add_user_to_group(user1, group2, admin_user, admin_password)

    # Adding a user to a non-existent group creates the group
    group3 = _random_string()
    privileged.add_user_to_group(user1, group3, admin_user, admin_password)
    _cleanup_groups.add(group3)

    # The expected groups got created and the user is part of them.
    expected_groups = [group1, group2, group3]
    assert expected_groups == privileged.get_user_groups(user1)

    # Remove user from group
    group_to_remove_from = random.choice(expected_groups)
    privileged.remove_user_from_group(user1, group_to_remove_from, admin_user,
                                      admin_password)

    # User is no longer in the group that they're removed from
    expected_groups.remove(group_to_remove_from)
    assert expected_groups == privileged.get_user_groups(user1)

    # User cannot be removed from a group that they're not part of
    random_group = _random_string()
    _create_group(random_group)
    with pytest.raises(subprocess.CalledProcessError):
        privileged.remove_user_from_group(user1, random_group, admin_user,
                                          admin_password)