File: login_auth_test_must_run_manually.py

package info (click to toggle)
python-irodsclient 3.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,352 kB
  • sloc: python: 16,650; xml: 525; sh: 104; awk: 5; sql: 3; makefile: 3
file content (610 lines) | stat: -rw-r--r-- 22,772 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
#! /usr/bin/env python

import os
import sys
import tempfile
import unittest
import textwrap
import json
import shutil
import ssl
import irods.test.helpers as helpers
from irods.connection import Connection
from irods.session import iRODSSession, NonAnonymousLoginWithoutPassword
from irods.rule import Rule
from irods.models import User
from socket import gethostname
from irods.password_obfuscation import encode as pw_encode
from irods.connection import PlainTextPAMPasswordError
from irods.access import iRODSAccess
import irods.exception as ex
import contextlib
import socket
from re import compile as regex
import gc
from irods.test.setupssl import create_ssl_dir

#
# Allow override to specify the PAM password in effect for the test rodsuser.
#
TEST_PAM_PW_OVERRIDE = os.environ.get("PYTHON_IRODSCLIENT_TEST_PAM_PW_OVERRIDE", "")
TEST_PAM_PW = TEST_PAM_PW_OVERRIDE or "test123"

TEST_IRODS_PW = "apass"
TEST_RODS_USER = "alissa"


try:
    from re import _pattern_type as regex_type
except ImportError:
    from re import Pattern as regex_type  # Python 3.7+


def json_file_update(fname, keys_to_delete=(), **kw):
    with open(fname, "r") as f:
        j = json.load(f)
    j.update(**kw)
    for k in keys_to_delete:
        if k in j:
            del j[k]
        elif isinstance(k, regex_type):
            jk = [i for i in j.keys() if k.search(i)]
            for ky in jk:
                del j[ky]
    with open(fname, "w") as out:
        json.dump(j, out, indent=4)


def env_dir_fullpath(authtype):
    return os.path.join(os.environ["HOME"], ".irods." + authtype)


def json_env_fullpath(authtype):
    return os.path.join(env_dir_fullpath(authtype), "irods_environment.json")


def secrets_fullpath(authtype):
    return os.path.join(env_dir_fullpath(authtype), ".irodsA")


RODSADMIN_ENV_PATH = os.path.expanduser("~/.irods/irods_environment.json")

SERVER_ENV_SSL_SETTINGS = {
    "irods_ssl_certificate_chain_file": "/etc/irods/ssl/irods.crt",
    "irods_ssl_certificate_key_file": "/etc/irods/ssl/irods.key",
    "irods_ssl_dh_params_file": "/etc/irods/ssl/dhparams.pem",
    "irods_ssl_ca_certificate_file": "/etc/irods/ssl/irods.crt",
    "irods_ssl_verify_server": "cert",
}

CLIENT_OPTIONS_FOR_SSL = {
    "irods_client_server_policy": "CS_NEG_REQUIRE",
    "irods_client_server_negotiation": "request_server_negotiation",
    "irods_ssl_ca_certificate_file": "/etc/irods/ssl/irods.crt",
    "irods_ssl_verify_server": "cert",
    "irods_encryption_key_size": 16,
    "irods_encryption_salt_size": 8,
    "irods_encryption_num_hash_rounds": 16,
    "irods_encryption_algorithm": "AES-256-CBC",
}


def client_env_keys_from_admin_env(user_name, auth_scheme=""):
    cli_env = {}
    with open(RODSADMIN_ENV_PATH) as f:
        srv_env = json.load(f)
        for k in ["irods_host", "irods_zone_name", "irods_port"]:
            cli_env[k] = srv_env[k]
    cli_env["irods_user_name"] = user_name
    if auth_scheme:
        cli_env["irods_authentication_scheme"] = auth_scheme
    return cli_env


@contextlib.contextmanager
def pam_password_in_plaintext(allow=True):
    saved = bool(Connection.DISALLOWING_PAM_PLAINTEXT)
    try:
        Connection.DISALLOWING_PAM_PLAINTEXT = not (allow)
        yield
    finally:
        Connection.DISALLOWING_PAM_PLAINTEXT = saved


class TestLogins(unittest.TestCase):
    """
    Ideally, these tests should move into CI, but that would require the server
    (currently a different node than the client) to have SSL certs created and
    enabled.

    Until then, we require these tests to be run manually on a server node,
    with:

        python -m unittest "irods.test.login_auth_test[.XX[.YY]]'

    Additionally:

      1. The PAM/SSL tests under the TestLogins class should be run on a
         single-node iRODS system, by the service account user. This ensures
         the /etc/irods directory is local and writable.

      2. ./setupssl.py (sets up SSL keys etc. in /etc/irods/ssl) should be run
         first to create (or overwrite, if appropriate) the /etc/irods/ssl directory
         and its contents.

      3. Must add & override configuration entries in /var/lib/irods/irods_environment
         Per https://slides.com/irods/ugm2018-ssl-and-pam-configuration#/3/7

    """

    user_auth_envs = {
        ".irods.pam": {"USER": TEST_RODS_USER, "PASSWORD": TEST_PAM_PW, "AUTH": "pam"},
        ".irods.native": {
            "USER": TEST_RODS_USER,
            "PASSWORD": TEST_IRODS_PW,
            "AUTH": "native",
        },
    }

    env_save = {}

    @contextlib.contextmanager
    def setenv(self, var, newvalue):
        try:
            self.env_save[var] = os.environ.get(var, None)
            os.environ[var] = newvalue
            yield newvalue
        finally:
            oldvalue = self.env_save[var]
            if oldvalue is None:
                del os.environ[var]
            else:
                os.environ[var] = oldvalue

    def create_env_dirs(self):
        dirs = {}
        retval = []
        # -- create environment configurations and secrets
        with pam_password_in_plaintext():
            for dirname, lookup in self.user_auth_envs.items():
                if lookup["AUTH"] in ("pam", "pam_password"):
                    ses = iRODSSession(
                        host=gethostname(),
                        user=lookup["USER"],
                        zone="tempZone",
                        authentication_scheme=lookup["AUTH"],
                        password=lookup["PASSWORD"],
                        port=1247,
                    )
                    try:
                        pam_hashes = ses.pam_pw_negotiated
                    except AttributeError:
                        pam_hashes = []
                    if not pam_hashes:
                        print("Warning ** PAM pw couldnt be generated")
                        break
                    scrambled_pw = pw_encode(pam_hashes[0])
                # elif lookup['AUTH'] == 'XXXXXX': # TODO: insert other authentication schemes here
                elif lookup["AUTH"] in ("native", "", None):
                    scrambled_pw = pw_encode(lookup["PASSWORD"])
                cl_env = client_env_keys_from_admin_env(TEST_RODS_USER)
                if (
                    lookup.get("AUTH", None) is not None
                ):  # - specify auth scheme only if given
                    cl_env["irods_authentication_scheme"] = lookup["AUTH"]
                dirbase = os.path.join(os.environ["HOME"], dirname)
                dirs[dirbase] = {"secrets": scrambled_pw, "client_environment": cl_env}

        # -- create the environment directories and write into them the configurations just created
        for absdir in dirs.keys():
            shutil.rmtree(absdir, ignore_errors=True)
            os.mkdir(absdir)
            with open(os.path.join(absdir, "irods_environment.json"), "w") as envfile:
                envfile.write("{}")
            json_file_update(envfile.name, **dirs[absdir]["client_environment"])
            with open(os.path.join(absdir, ".irodsA"), "w") as secrets_file:
                secrets_file.write(dirs[absdir]["secrets"])
            os.chmod(secrets_file.name, 0o600)

        retval = dirs.keys()
        return retval

    PAM_SCHEME_STRING = "pam"

    @classmethod
    def setUpClass(cls):
        cls.admin = helpers.make_session()
        if cls.admin.server_version >= (4, 3):
            cls.PAM_SCHEME_STRING = cls.user_auth_envs[".irods.pam"]["AUTH"] = (
                "pam_password"
            )

    @classmethod
    def tearDownClass(cls):
        cls.admin.cleanup()

    def setUp(self):
        super(TestLogins, self).setUp()

    def tearDown(self):
        for envdir in getattr(self, "envdirs", []):
            shutil.rmtree(envdir, ignore_errors=True)
        super(TestLogins, self).tearDown()

    def validate_session(self, session, verbose=False, **options):

        # - try to get the home collection
        home_coll = "/{0.zone}/home/{0.username}".format(session)
        self.assertTrue(session.collections.get(home_coll).path == home_coll)
        if verbose:
            print(home_coll)
        # - check user is as expected
        self.assertEqual(session.username, TEST_RODS_USER)
        # - check socket type (normal vs SSL) against whether ssl requested
        use_ssl = options.pop("ssl", None)
        if use_ssl is not None:
            my_connect = [s for s in (session.pool.active | session.pool.idle)][0]
            self.assertEqual(
                bool(use_ssl), my_connect.socket.__class__ is ssl.SSLSocket
            )

    @contextlib.contextmanager
    def _setup_rodsuser_and_optional_pw(self, name, make_irods_pw=False):
        try:
            self.admin.users.create(name, "rodsuser")
            if make_irods_pw:
                self.admin.users.modify(name, "password", TEST_IRODS_PW)
            yield
        finally:
            self.admin.users.remove(name)

    def tst0(
        self, ssl_opt, auth_opt, env_opt, name=TEST_RODS_USER, make_irods_pw=False
    ):
        _auth_opt = auth_opt
        if auth_opt in ("pam", "pam_password"):
            auth_opt = self.PAM_SCHEME_STRING
        with self._setup_rodsuser_and_optional_pw(
            name=name, make_irods_pw=make_irods_pw
        ):
            self.envdirs = self.create_env_dirs()
            if not self.envdirs:
                raise RuntimeError("Could not create one or more client environments")
            auth_opt_explicit = "native" if _auth_opt == "" else _auth_opt
            verbosity = False
            # verbosity='' # -- debug - sanity check by printing out options applied
            out = {"": ""}
            if env_opt:
                with self.setenv(
                    "IRODS_ENVIRONMENT_FILE", json_env_fullpath(auth_opt_explicit)
                ) as env_file, self.setenv(
                    "IRODS_AUTHENTICATION_FILE", secrets_fullpath(auth_opt_explicit)
                ):
                    cli_env_extras = (
                        {} if not (ssl_opt) else dict(CLIENT_OPTIONS_FOR_SSL)
                    )
                    if auth_opt:
                        cli_env_extras.update(irods_authentication_scheme=auth_opt)
                        remove = []
                    else:
                        remove = [regex("authentication_")]
                    with helpers.file_backed_up(env_file):
                        json_file_update(
                            env_file, keys_to_delete=remove, **cli_env_extras
                        )
                        session = iRODSSession(irods_env_file=env_file)
                        with open(env_file) as f:
                            out = json.load(f)
                        self.validate_session(session, verbose=verbosity, ssl=ssl_opt)
                        session.cleanup()
                out["ARGS"] = "no"
            else:
                session_options = {}
                if auth_opt:
                    session_options.update(authentication_scheme=auth_opt)
                if ssl_opt:
                    SSL_cert = CLIENT_OPTIONS_FOR_SSL["irods_ssl_ca_certificate_file"]
                    session_options.update(
                        ssl_context=ssl.create_default_context(
                            purpose=ssl.Purpose.SERVER_AUTH,
                            capath=None,
                            cadata=None,
                            cafile=SSL_cert,
                        ),
                        **CLIENT_OPTIONS_FOR_SSL
                    )
                lookup = self.user_auth_envs[
                    ".irods." + ("native" if not (_auth_opt) else _auth_opt)
                ]
                session = iRODSSession(
                    host=gethostname(),
                    user=lookup["USER"],
                    zone="tempZone",
                    password=lookup["PASSWORD"],
                    port=1247,
                    **session_options
                )
                out = session_options
                self.validate_session(session, verbose=verbosity, ssl=ssl_opt)
                session.cleanup()
                out["ARGS"] = "yes"

            if verbosity == "":
                print("--- ssl:", ssl_opt, "/ auth:", repr(auth_opt), "/ env:", env_opt)
                print(
                    "--- > ",
                    json.dumps(
                        {k: v for k, v in out.items() if k != "ssl_context"}, indent=4
                    ),
                )
                print("---")

    # == test defaulting to 'native'

    def test_01(self):
        self.tst0(ssl_opt=True, auth_opt="", env_opt=False, make_irods_pw=True)

    def test_02(self):
        self.tst0(ssl_opt=False, auth_opt="", env_opt=False, make_irods_pw=True)

    def test_03(self):
        self.tst0(ssl_opt=True, auth_opt="", env_opt=True, make_irods_pw=True)

    def test_04(self):
        self.tst0(ssl_opt=False, auth_opt="", env_opt=True, make_irods_pw=True)

    # == test explicit scheme 'native'

    def test_1(self):
        self.tst0(ssl_opt=True, auth_opt="native", env_opt=False, make_irods_pw=True)

    def test_2(self):
        self.tst0(ssl_opt=False, auth_opt="native", env_opt=False, make_irods_pw=True)

    def test_3(self):
        self.tst0(ssl_opt=True, auth_opt="native", env_opt=True, make_irods_pw=True)

    def test_4(self):
        self.tst0(ssl_opt=False, auth_opt="native", env_opt=True, make_irods_pw=True)

    # == test explicit scheme 'pam'

    def test_5(self):
        self.tst0(ssl_opt=True, auth_opt="pam", env_opt=False)

    def test_6(self):
        try:
            self.tst0(ssl_opt=False, auth_opt="pam", env_opt=False)
        except PlainTextPAMPasswordError:
            pass
        else:
            # -- no exception raised
            self.fail("PlainTextPAMPasswordError should have been raised")

    def test_7(self):
        self.tst0(ssl_opt=True, auth_opt="pam", env_opt=True)

    def test_8(self):
        self.tst0(ssl_opt=False, auth_opt="pam", env_opt=True)

    @unittest.skipUnless(
        TEST_PAM_PW_OVERRIDE,
        "Skipping unless pam password is overridden (e.g. to test special characters)",
    )
    def test_escaped_pam_password_chars__362(self):
        with self._setup_rodsuser_and_optional_pw(name=TEST_RODS_USER):
            context = ssl._create_unverified_context(
                purpose=ssl.Purpose.SERVER_AUTH,
                capath=None,
                cadata=None,
                cafile=None,
            )
            ssl_settings = {
                "client_server_negotiation": "request_server_negotiation",
                "client_server_policy": "CS_NEG_REQUIRE",
                "encryption_algorithm": "AES-256-CBC",
                "encryption_key_size": 32,
                "encryption_num_hash_rounds": 16,
                "encryption_salt_size": 8,
                "ssl_ca_certificate_file": "/etc/irods/ssl/irods.crt",
                "ssl_context": context,
            }
            irods_session = iRODSSession(
                host=self.admin.host,
                port=self.admin.port,
                zone=self.admin.zone,
                user=TEST_RODS_USER,
                password=TEST_PAM_PW_OVERRIDE,
                authentication_scheme="pam",
                **ssl_settings
            )
            home_coll = "/{0.zone}/home/{0.username}".format(irods_session)
            self.assertEqual(irods_session.collections.get(home_coll).path, home_coll)


class TestAnonymousUser(unittest.TestCase):

    def setUp(self):
        admin = self.admin = helpers.make_session()

        user = self.user = admin.users.create("anonymous", "rodsuser", admin.zone)
        self.home = "/{admin.zone}/home/{user.name}".format(**locals())

        admin.collections.create(self.home)
        acl = iRODSAccess("own", self.home, user.name)
        admin.acls.set(acl, admin=True)

        self.env_file = os.path.expanduser("~/.irods.anon/irods_environment.json")
        self.env_dir = os.path.dirname(self.env_file)
        self.auth_file = os.path.expanduser("~/.irods.anon/.irodsA")
        os.mkdir(os.path.dirname(self.env_file))
        json.dump(
            {
                "irods_host": admin.host,
                "irods_port": admin.port,
                "irods_user_name": user.name,
                "irods_zone_name": admin.zone,
            },
            open(self.env_file, "w"),
            indent=4,
        )

    def tearDown(self):
        self.admin.collections.remove(self.home, recurse=True, force=True)
        self.admin.users.remove(self.user.name)
        shutil.rmtree(self.env_dir, ignore_errors=True)

    def test_login_from_environment(self):
        orig_env = os.environ.copy()
        try:
            os.environ["IRODS_ENVIRONMENT_FILE"] = self.env_file
            os.environ["IRODS_AUTHENTICATION_FILE"] = self.auth_file
            ses = helpers.make_session()
            ses.collections.get(self.home)
        finally:
            os.environ.clear()
            os.environ.update(orig_env)


class TestMiscellaneous(unittest.TestCase):

    def test_nonanonymous_login_without_auth_file_fails__290(self):
        ses = self.admin
        if ses.users.get(ses.username).type != "rodsadmin":
            self.skipTest("Only a rodsadmin may run this test.")
        try:
            ENV_DIR = tempfile.mkdtemp()
            ses.users.create("bob", "rodsuser")
            ses.users.modify("bob", "password", "bpass")
            d = dict(
                password="bpass",
                user="bob",
                host=ses.host,
                port=ses.port,
                zone=ses.zone,
            )
            (bob_env, bob_auth) = helpers.make_environment_and_auth_files(ENV_DIR, **d)
            login_options = {
                "irods_env_file": bob_env,
                "irods_authentication_file": bob_auth,
            }
            with helpers.make_session(**login_options) as s:
                s.users.get("bob")
            os.unlink(bob_auth)
            # -- Check that we raise an appropriate exception pointing to the missing auth file path --
            with self.assertRaisesRegexp(NonAnonymousLoginWithoutPassword, bob_auth):
                with helpers.make_session(**login_options) as s:
                    s.users.get("bob")
        finally:
            try:
                shutil.rmtree(ENV_DIR, ignore_errors=True)
                ses.users.get("bob").remove()
            except ex.UserDoesNotExist:
                pass

    def setUp(self):
        admin = self.admin = helpers.make_session()
        if admin.users.get(admin.username).type != "rodsadmin":
            self.skipTest("need admin privilege")
        admin.users.create("alice", "rodsuser")

    def tearDown(self):
        self.admin.users.remove("alice")
        self.admin.cleanup()

    def test_destruct_session_with_no_pool_315(self):

        destruct_flag = [False]

        class mySess(iRODSSession):
            def __del__(self):
                self.pool = None
                super(mySess, self).__del__()  # call parent destructor(s) - will raise
                # an error before the #315 fix
                destruct_flag[:] = [True]

        admin = self.admin
        admin.users.modify("alice", "password", "apass")

        my_sess = mySess(
            user="alice",
            password="apass",
            host=admin.host,
            port=admin.port,
            zone=admin.zone,
        )
        my_sess.cleanup()
        del my_sess
        gc.collect()
        self.assertEqual(destruct_flag, [True])

    def test_non_anon_native_login_omitting_password_fails_1__290(self):
        # rodsuser with password unset
        with self.assertRaises(ex.CAT_INVALID_USER):
            self._non_anon_native_login_omitting_password_fails_N__290()

    def test_non_anon_native_login_omitting_password_fails_2__290(self):
        # rodsuser with a password set
        self.admin.users.modify("alice", "password", "apass")
        with self.assertRaises(ex.CAT_INVALID_AUTHENTICATION):
            self._non_anon_native_login_omitting_password_fails_N__290()

    def _non_anon_native_login_omitting_password_fails_N__290(self):
        admin = self.admin
        with iRODSSession(
            zone=admin.zone, port=admin.port, host=admin.host, user="alice"
        ) as alice:
            alice.collections.get(helpers.home_collection(alice))


class TestWithSSL(unittest.TestCase):
    """
    The tests within this class should be run by an account other than the
    service account.  Otherwise there is risk of corrupting the server setup.
    """

    def setUp(self):
        if os.path.expanduser("~") == "/var/lib/irods":
            self.skipTest("TestWithSSL may not be run by user irods")
        if not os.path.exists("/etc/irods/ssl"):
            self.skipTest(
                "Running setupssl.py as irods user is prerequisite for this test."
            )
        with helpers.make_session() as session:
            if not session.host in ("localhost", socket.gethostname()):
                self.skipTest("Test must be run co-resident with server")

    def test_ssl_with_server_verify_set_to_none_281(self):
        env_file = os.path.expanduser("~/.irods/irods_environment.json")
        my_ssl_directory = ""
        try:
            with helpers.file_backed_up(env_file):
                with open(env_file) as env_file_handle:
                    env = json.load(env_file_handle)
                my_ssl_directory = tempfile.mkdtemp(dir=os.path.expanduser("~"))
                # Elect for efficiency in DH param generation, eg. when setting up for testing.
                create_ssl_dir(
                    ssl_dir=my_ssl_directory, use_strong_primes_for_dh_generation=False
                )
                settings_to_update = {
                    key: value.replace("/etc/irods/ssl", my_ssl_directory)
                    for key, value in env.items()
                    if type(value) is str and value.startswith("/etc/irods/ssl")
                }
                settings_to_update["irods_ssl_verify_server"] = "none"
                env.update(settings_to_update)
                with open(env_file, "w") as f:
                    json.dump(env, f)
                with helpers.make_session() as session:
                    session.collections.get(
                        "/{session.zone}/home/{session.username}".format(**locals())
                    )
        finally:
            if my_ssl_directory:
                shutil.rmtree(my_ssl_directory)


if __name__ == "__main__":
    # let the tests find the parent irods lib
    sys.path.insert(0, os.path.abspath("../.."))
    unittest.main()