File: test_authorizers.py

package info (click to toggle)
python-pyftpdlib 2.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,232 kB
  • sloc: python: 10,362; makefile: 346
file content (624 lines) | stat: -rw-r--r-- 23,131 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
# Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>.
# Use of this source code is governed by MIT license that can be
# found in the LICENSE file.

import os
import random
import string
import sys
import warnings

import pytest

from pyftpdlib.authorizers import AuthenticationFailed
from pyftpdlib.authorizers import AuthorizerError
from pyftpdlib.authorizers import DummyAuthorizer

from . import HOME
from . import PASSWD
from . import POSIX
from . import USER
from . import WINDOWS
from . import PyftpdlibTestCase
from . import touch


if POSIX:
    import pwd

    try:
        from pyftpdlib.authorizers import UnixAuthorizer
    except ImportError:
        UnixAuthorizer = None
else:
    UnixAuthorizer = None

if WINDOWS:
    from pywintypes import error as Win32ExtError

    from pyftpdlib.authorizers import WindowsAuthorizer
else:
    WindowsAuthorizer = None


class TestDummyAuthorizer(PyftpdlibTestCase):
    """Tests for DummyAuthorizer class."""

    # temporarily change warnings to exceptions for the purposes of testing
    def setUp(self):
        super().setUp()
        self.tempdir = os.path.abspath(self.get_testfn())
        self.subtempdir = os.path.join(self.tempdir, self.get_testfn())
        self.tempfile = os.path.join(self.tempdir, self.get_testfn())
        self.subtempfile = os.path.join(self.subtempdir, self.get_testfn())
        os.mkdir(self.tempdir)
        os.mkdir(self.subtempdir)
        touch(self.tempfile)
        touch(self.subtempfile)
        warnings.filterwarnings("error")

    def tearDown(self):
        os.remove(self.tempfile)
        os.remove(self.subtempfile)
        os.rmdir(self.subtempdir)
        os.rmdir(self.tempdir)
        warnings.resetwarnings()
        super().tearDown()

    def test_common_methods(self):
        auth = DummyAuthorizer()
        # create user
        auth.add_user(USER, PASSWD, HOME)
        auth.add_anonymous(HOME)
        # check credentials
        auth.validate_authentication(USER, PASSWD, None)
        with pytest.raises(AuthenticationFailed):
            auth.validate_authentication(USER, 'wrongpwd', None)
        auth.validate_authentication('anonymous', 'foo', None)
        auth.validate_authentication('anonymous', '', None)  # empty passwd
        # remove them
        auth.remove_user(USER)
        auth.remove_user('anonymous')
        # raise exc if user does not exists
        with pytest.raises(KeyError):
            auth.remove_user(USER)
        # raise exc if path does not exist
        with pytest.raises(ValueError, match='no such directory'):
            auth.add_user(USER, PASSWD, '?:\\')
        with pytest.raises(ValueError, match='no such directory'):
            auth.add_anonymous('?:\\')
        # raise exc if user already exists
        auth.add_user(USER, PASSWD, HOME)
        auth.add_anonymous(HOME)
        with pytest.raises(ValueError, match=f'user {USER!r} already exists'):
            auth.add_user(USER, PASSWD, HOME)
        with pytest.raises(
            ValueError, match="user 'anonymous' already exists"
        ):
            auth.add_anonymous(HOME)
        auth.remove_user(USER)
        auth.remove_user('anonymous')
        # raise on wrong permission
        with pytest.raises(ValueError, match="no such permission"):
            auth.add_user(USER, PASSWD, HOME, perm='?')
        with pytest.raises(ValueError, match="no such permission"):
            auth.add_anonymous(HOME, perm='?')
        # expect warning on write permissions assigned to anonymous user
        for x in "adfmw":
            with pytest.raises(
                RuntimeWarning,
                match="write permissions assigned to anonymous user.",
            ):
                auth.add_anonymous(HOME, perm=x)

    def test_override_perm_interface(self):
        auth = DummyAuthorizer()
        auth.add_user(USER, PASSWD, HOME, perm='elr')
        # raise exc if user does not exists
        with pytest.raises(KeyError):
            auth.override_perm(USER + 'w', HOME, 'elr')
        # raise exc if path does not exist or it's not a directory
        with pytest.raises(ValueError, match='no such directory'):
            auth.override_perm(USER, '?:\\', 'elr')
        with pytest.raises(ValueError, match='no such directory'):
            auth.override_perm(USER, self.tempfile, 'elr')
        # raise on wrong permission
        with pytest.raises(ValueError, match="no such permission"):
            auth.override_perm(USER, HOME, perm='?')
        # expect warning on write permissions assigned to anonymous user
        auth.add_anonymous(HOME)
        for p in "adfmw":
            with pytest.raises(
                RuntimeWarning,
                match="write permissions assigned to anonymous user.",
            ):
                auth.override_perm('anonymous', HOME, p)
        # raise on attempt to override home directory permissions
        with pytest.raises(
            ValueError, match="can't override home directory permissions"
        ):
            auth.override_perm(USER, HOME, perm='w')
        # raise on attempt to override a path escaping home directory
        if os.path.dirname(HOME) != HOME:
            with pytest.raises(
                ValueError, match="path escapes user home directory"
            ):
                auth.override_perm(USER, os.path.dirname(HOME), perm='w')
        # try to re-set an overridden permission
        auth.override_perm(USER, self.tempdir, perm='w')
        auth.override_perm(USER, self.tempdir, perm='wr')

    def test_override_perm_recursive_paths(self):
        auth = DummyAuthorizer()
        auth.add_user(USER, PASSWD, HOME, perm='elr')
        assert not auth.has_perm(USER, 'w', self.tempdir)
        auth.override_perm(USER, self.tempdir, perm='w', recursive=True)
        assert not auth.has_perm(USER, 'w', HOME)
        assert auth.has_perm(USER, 'w', self.tempdir)
        assert auth.has_perm(USER, 'w', self.tempfile)
        assert auth.has_perm(USER, 'w', self.subtempdir)
        assert auth.has_perm(USER, 'w', self.subtempfile)

        assert not auth.has_perm(USER, 'w', HOME + '@')
        assert not auth.has_perm(USER, 'w', self.tempdir + '@')
        path = os.path.join(
            self.tempdir + '@', os.path.basename(self.tempfile)
        )
        assert not auth.has_perm(USER, 'w', path)
        # test case-sensitiveness
        if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
            assert auth.has_perm(USER, 'w', self.tempdir.upper())

    def test_override_perm_not_recursive_paths(self):
        auth = DummyAuthorizer()
        auth.add_user(USER, PASSWD, HOME, perm='elr')
        assert not auth.has_perm(USER, 'w', self.tempdir)
        auth.override_perm(USER, self.tempdir, perm='w')
        assert not auth.has_perm(USER, 'w', HOME)
        assert auth.has_perm(USER, 'w', self.tempdir)
        assert auth.has_perm(USER, 'w', self.tempfile)
        assert not auth.has_perm(USER, 'w', self.subtempdir)
        assert not auth.has_perm(USER, 'w', self.subtempfile)

        assert not auth.has_perm(USER, 'w', HOME + '@')
        assert not auth.has_perm(USER, 'w', self.tempdir + '@')
        path = os.path.join(
            self.tempdir + '@', os.path.basename(self.tempfile)
        )
        assert not auth.has_perm(USER, 'w', path)
        # test case-sensitiveness
        if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
            assert auth.has_perm(USER, 'w', self.tempdir.upper())


class _SharedAuthorizerTests:
    """Tests valid for both UnixAuthorizer and WindowsAuthorizer for
    those parts which share the same API.
    """

    authorizer_class = None
    # --- utils

    def get_users(self):
        return self.authorizer_class._get_system_users()

    @staticmethod
    def get_current_user():
        if POSIX:
            return pwd.getpwuid(os.getuid()).pw_name
        else:
            return os.environ['USERNAME']

    @staticmethod
    def get_current_user_homedir():
        if POSIX:
            return pwd.getpwuid(os.getuid()).pw_dir
        else:
            return os.environ['USERPROFILE']

    def get_nonexistent_user(self):
        # return a user which does not exist on the system
        users = self.get_users()
        letters = string.ascii_lowercase
        while True:
            user = ''.join([random.choice(letters) for i in range(10)])
            if user not in users:
                return user

    def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs):
        try:
            callableObj(*args, **kwargs)
        except excClass as err:
            if str(err) == msg:
                return
            raise self.failureException(f"{err!s} != {msg}")
        else:
            if hasattr(excClass, '__name__'):
                excName = excClass.__name__
            else:
                excName = str(excClass)
            raise self.failureException(f"{excName} not raised")

    # --- /utils

    def test_get_home_dir(self):
        auth = self.authorizer_class()
        home = auth.get_home_dir(self.get_current_user())
        nonexistent_user = self.get_nonexistent_user()
        assert os.path.isdir(home)
        if auth.has_user('nobody'):
            home = auth.get_home_dir('nobody')
        with pytest.raises(AuthorizerError):
            auth.get_home_dir(nonexistent_user)

    def test_has_user(self):
        auth = self.authorizer_class()
        current_user = self.get_current_user()
        nonexistent_user = self.get_nonexistent_user()
        assert auth.has_user(current_user)
        assert not auth.has_user(nonexistent_user)
        auth = self.authorizer_class(rejected_users=[current_user])
        assert not auth.has_user(current_user)

    def test_validate_authentication(self):
        # can't test for actual success in case of valid authentication
        # here as we don't have the user password
        if self.authorizer_class.__name__ == 'UnixAuthorizer':
            auth = self.authorizer_class(require_valid_shell=False)
        else:
            auth = self.authorizer_class()
        current_user = self.get_current_user()
        nonexistent_user = self.get_nonexistent_user()
        with pytest.raises(AuthenticationFailed):
            auth.validate_authentication(
                current_user,
                'wrongpasswd',
                None,
            )
        with pytest.raises(AuthenticationFailed):
            auth.validate_authentication(
                nonexistent_user,
                'bar',
                None,
            )

    def test_impersonate_user(self):
        auth = self.authorizer_class()
        nonexistent_user = self.get_nonexistent_user()
        try:
            if self.authorizer_class.__name__ == 'UnixAuthorizer':
                auth.impersonate_user(self.get_current_user(), '')
                with pytest.raises(AuthorizerError):
                    auth.impersonate_user(
                        nonexistent_user,
                        'pwd',
                    )
            else:
                with pytest.raises(Win32ExtError):
                    auth.impersonate_user(
                        nonexistent_user,
                        'pwd',
                    )
                with pytest.raises(Win32ExtError):
                    auth.impersonate_user(
                        self.get_current_user(),
                        '',
                    )
        finally:
            auth.terminate_impersonation('')

    def test_terminate_impersonation(self):
        auth = self.authorizer_class()
        auth.terminate_impersonation('')
        auth.terminate_impersonation('')

    def test_get_perms(self):
        auth = self.authorizer_class(global_perm='elr')
        assert 'r' in auth.get_perms(self.get_current_user())
        assert 'w' not in auth.get_perms(self.get_current_user())

    def test_has_perm(self):
        auth = self.authorizer_class(global_perm='elr')
        assert auth.has_perm(self.get_current_user(), 'r')
        assert not auth.has_perm(self.get_current_user(), 'w')

    def test_messages(self):
        auth = self.authorizer_class(msg_login="login", msg_quit="quit")
        assert auth.get_msg_login, "login"
        assert auth.get_msg_quit, "quit"

    def test_error_options(self):
        wrong_user = self.get_nonexistent_user()
        self.assertRaisesWithMsg(
            AuthorizerError,
            "rejected_users and allowed_users options are mutually exclusive",
            self.authorizer_class,
            allowed_users=['foo'],
            rejected_users=['bar'],
        )
        self.assertRaisesWithMsg(
            AuthorizerError,
            'invalid username "anonymous"',
            self.authorizer_class,
            allowed_users=['anonymous'],
        )
        self.assertRaisesWithMsg(
            AuthorizerError,
            'invalid username "anonymous"',
            self.authorizer_class,
            rejected_users=['anonymous'],
        )
        self.assertRaisesWithMsg(
            AuthorizerError,
            f'unknown user {wrong_user}',
            self.authorizer_class,
            allowed_users=[wrong_user],
        )
        self.assertRaisesWithMsg(
            AuthorizerError,
            f'unknown user {wrong_user}',
            self.authorizer_class,
            rejected_users=[wrong_user],
        )

    def test_override_user_password(self):
        auth = self.authorizer_class()
        user = self.get_current_user()
        auth.override_user(user, password='foo')
        auth.validate_authentication(user, 'foo', None)
        with pytest.raises(AuthenticationFailed):
            auth.validate_authentication(
                user,
                'bar',
                None,
            )
        # make sure other settings keep using default values
        assert auth.get_home_dir(user) == self.get_current_user_homedir()
        assert auth.get_perms(user) == "elradfmwMT"
        assert auth.get_msg_login(user) == "Login successful."
        assert auth.get_msg_quit(user) == "Goodbye."

    def test_override_user_homedir(self):
        auth = self.authorizer_class()
        user = self.get_current_user()
        dir = os.path.dirname(os.getcwd())
        auth.override_user(user, homedir=dir)
        assert auth.get_home_dir(user) == dir
        # make sure other settings keep using default values
        # self.assertEqual(auth.get_home_dir(user),
        #                  self.get_current_user_homedir())
        assert auth.get_perms(user) == "elradfmwMT"
        assert auth.get_msg_login(user) == "Login successful."
        assert auth.get_msg_quit(user) == "Goodbye."

    def test_override_user_perm(self):
        auth = self.authorizer_class()
        user = self.get_current_user()
        auth.override_user(user, perm="elr")
        assert auth.get_perms(user) == "elr"
        # make sure other settings keep using default values
        assert auth.get_home_dir(user) == self.get_current_user_homedir()
        # self.assertEqual(auth.get_perms(user), "elradfmwMT")
        assert auth.get_msg_login(user) == "Login successful."
        assert auth.get_msg_quit(user) == "Goodbye."

    def test_override_user_msg_login_quit(self):
        auth = self.authorizer_class()
        user = self.get_current_user()
        auth.override_user(user, msg_login="foo", msg_quit="bar")
        assert auth.get_msg_login(user) == "foo"
        assert auth.get_msg_quit(user) == "bar"
        # make sure other settings keep using default values
        assert auth.get_home_dir(user) == self.get_current_user_homedir()
        assert auth.get_perms(user) == "elradfmwMT"
        # self.assertEqual(auth.get_msg_login(user), "Login successful.")
        # self.assertEqual(auth.get_msg_quit(user), "Goodbye.")

    def test_override_user_errors(self):
        if self.authorizer_class.__name__ == 'UnixAuthorizer':
            auth = self.authorizer_class(require_valid_shell=False)
        else:
            auth = self.authorizer_class()
        this_user = self.get_current_user()
        for x in self.get_users():
            if x != this_user:
                another_user = x
                break
        nonexistent_user = self.get_nonexistent_user()
        self.assertRaisesWithMsg(
            AuthorizerError,
            "at least one keyword argument must be specified",
            auth.override_user,
            this_user,
        )
        self.assertRaisesWithMsg(
            AuthorizerError,
            f'no such user {nonexistent_user}',
            auth.override_user,
            nonexistent_user,
            perm='r',
        )
        if self.authorizer_class.__name__ == 'UnixAuthorizer':
            auth = self.authorizer_class(
                allowed_users=[this_user], require_valid_shell=False
            )
        else:
            auth = self.authorizer_class(allowed_users=[this_user])
        auth.override_user(this_user, perm='r')
        self.assertRaisesWithMsg(
            AuthorizerError,
            f'{another_user} is not an allowed user',
            auth.override_user,
            another_user,
            perm='r',
        )
        if self.authorizer_class.__name__ == 'UnixAuthorizer':
            auth = self.authorizer_class(
                rejected_users=[this_user], require_valid_shell=False
            )
        else:
            auth = self.authorizer_class(rejected_users=[this_user])
        auth.override_user(another_user, perm='r')
        self.assertRaisesWithMsg(
            AuthorizerError,
            f'{this_user} is not an allowed user',
            auth.override_user,
            this_user,
            perm='r',
        )
        self.assertRaisesWithMsg(
            AuthorizerError,
            "can't assign password to anonymous user",
            auth.override_user,
            "anonymous",
            password='foo',
        )


# =====================================================================
# --- UNIX authorizer
# =====================================================================


@pytest.mark.skipif(not POSIX, reason="UNIX only")
@pytest.mark.skipif(
    UnixAuthorizer is None, reason="UnixAuthorizer class not available"
)
class TestUnixAuthorizer(_SharedAuthorizerTests, PyftpdlibTestCase):
    """Unix authorizer specific tests."""

    authorizer_class = UnixAuthorizer

    def setUp(self):
        super().setUp()
        try:
            UnixAuthorizer()
        except AuthorizerError:  # not root
            self.skipTest("need root access")

    def test_get_perms_anonymous(self):
        auth = UnixAuthorizer(
            global_perm='elr', anonymous_user=self.get_current_user()
        )
        assert 'e' in auth.get_perms('anonymous')
        assert 'w' not in auth.get_perms('anonymous')
        warnings.filterwarnings("ignore")
        auth.override_user('anonymous', perm='w')
        warnings.resetwarnings()
        assert 'w' in auth.get_perms('anonymous')

    def test_has_perm_anonymous(self):
        auth = UnixAuthorizer(
            global_perm='elr', anonymous_user=self.get_current_user()
        )
        assert auth.has_perm(self.get_current_user(), 'r')
        assert not auth.has_perm(self.get_current_user(), 'w')
        assert auth.has_perm('anonymous', 'e')
        assert not auth.has_perm('anonymous', 'w')
        warnings.filterwarnings("ignore")
        auth.override_user('anonymous', perm='w')
        warnings.resetwarnings()
        assert auth.has_perm('anonymous', 'w')

    def test_validate_authentication(self):
        # we can only test for invalid credentials
        auth = UnixAuthorizer(require_valid_shell=False)
        with pytest.raises(AuthenticationFailed):
            auth.validate_authentication(
                '?!foo',
                '?!foo',
                None,
            )
        auth = UnixAuthorizer(require_valid_shell=True)
        with pytest.raises(AuthenticationFailed):
            auth.validate_authentication(
                '?!foo',
                '?!foo',
                None,
            )

    def test_validate_authentication_anonymous(self):
        current_user = self.get_current_user()
        auth = UnixAuthorizer(
            anonymous_user=current_user, require_valid_shell=False
        )
        with pytest.raises(AuthenticationFailed):
            auth.validate_authentication(
                'foo',
                'passwd',
                None,
            )
        with pytest.raises(AuthenticationFailed):
            auth.validate_authentication(
                current_user,
                'passwd',
                None,
            )
        auth.validate_authentication('anonymous', 'passwd', None)

    def test_require_valid_shell(self):

        def get_fake_shell_user():
            for user in self.get_users():
                shell = pwd.getpwnam(user).pw_shell
                # On linux fake shell is usually /bin/false, on
                # freebsd /usr/sbin/nologin;  in case of other
                # UNIX variants test needs to be adjusted.
                if '/false' in shell or '/nologin' in shell:
                    return user
            self.fail("no user found")

        user = get_fake_shell_user()
        self.assertRaisesWithMsg(
            AuthorizerError,
            f"user {user} has not a valid shell",
            UnixAuthorizer,
            allowed_users=[user],
        )
        # commented as it first fails for invalid home
        # self.assertRaisesWithMsg(
        #     ValueError,
        #     "user %s has not a valid shell" % user,
        #     UnixAuthorizer, anonymous_user=user)
        auth = UnixAuthorizer()
        assert auth._has_valid_shell(self.get_current_user())
        assert not auth._has_valid_shell(user)
        self.assertRaisesWithMsg(
            AuthorizerError,
            f"User {user} doesn't have a valid shell.",
            auth.override_user,
            user,
            perm='r',
        )

    def test_not_root(self):
        # UnixAuthorizer is supposed to work only as super user
        auth = self.authorizer_class()
        try:
            auth.impersonate_user('nobody', '')
            self.assertRaisesWithMsg(
                AuthorizerError,
                "super user privileges are required",
                UnixAuthorizer,
            )
        finally:
            auth.terminate_impersonation('nobody')


# =====================================================================
# --- Windows authorizer
# =====================================================================


@pytest.mark.skipif(not WINDOWS, reason="Windows only")
class TestWindowsAuthorizer(_SharedAuthorizerTests, PyftpdlibTestCase):
    """Windows authorizer specific tests."""

    authorizer_class = WindowsAuthorizer

    def test_wrong_anonymous_credentials(self):
        user = self.get_current_user()
        with pytest.raises(Win32ExtError):
            self.authorizer_class(
                anonymous_user=user, anonymous_password='$|1wrongpasswd'
            )